ExternalCatalog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.MysqlDb;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.es.EsExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.test.TestExternalCatalog;
import org.apache.doris.datasource.test.TestExternalDatabase;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.nereids.trees.plans.commands.TruncateTableCommand;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.transaction.TransactionManager;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
/**
* The abstract class for all types of external catalogs.
*/
@Data
public abstract class ExternalCatalog
implements CatalogIf<ExternalDatabase<? extends ExternalTable>>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalCatalog.class);
public static final String ENABLE_AUTO_ANALYZE = "enable.auto.analyze";
public static final String DORIS_VERSION = "doris.version";
public static final String DORIS_VERSION_VALUE = Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH;
public static final String USE_META_CACHE = "use_meta_cache";
public static final String CREATE_TIME = "create_time";
public static final boolean DEFAULT_USE_META_CACHE = true;
public static final String FOUND_CONFLICTING = "Found conflicting";
public static final String ONLY_TEST_LOWER_CASE_TABLE_NAMES = "only_test_lower_case_table_names";
// https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/use-rootpolicy-to-access-oss-hdfs?spm=a2c4g.11186623.help-menu-search-28066.d_0
public static final String OOS_ROOT_POLICY = "oss.root_policy";
public static final String SCHEMA_CACHE_TTL_SECOND = "schema.cache.ttl-second";
// -1 means cache with no ttl
public static final int CACHE_NO_TTL = -1;
// 0 means cache is disabled; >0 means cache with ttl;
public static final int CACHE_TTL_DISABLE_CACHE = 0;
// Properties that should not be shown in the `show create catalog` result
public static final Set<String> HIDDEN_PROPERTIES = Sets.newHashSet(
CREATE_TIME,
USE_META_CACHE);
protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors();
// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
@SerializedName(value = "name")
protected String name;
// TODO: Keep this to compatible with older version meta data. Need to remove after several DORIS versions.
protected String type;
@SerializedName(value = "logType")
protected InitCatalogLog.Type logType;
// save properties of this catalog, such as hive meta store url.
@SerializedName(value = "catalogProperty")
protected CatalogProperty catalogProperty;
@SerializedName(value = "initialized")
protected boolean initialized = false;
@SerializedName(value = "idToDb")
protected Map<Long, ExternalDatabase<? extends ExternalTable>> idToDb = Maps.newConcurrentMap();
@SerializedName(value = "lastUpdateTime")
protected long lastUpdateTime;
// <db name, table name> to tableAutoAnalyzePolicy
@SerializedName(value = "taap")
protected Map<Pair<String, String>, String> tableAutoAnalyzePolicy = Maps.newHashMap();
@SerializedName(value = "comment")
private String comment;
// db name does not contains "default_cluster"
protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
private boolean objectCreated = false;
protected boolean invalidCacheInInit = true;
protected ExternalMetadataOps metadataOps;
protected TransactionManager transactionManager;
private ExternalSchemaCache schemaCache;
// A cached and being converted properties for external catalog.
// generated from catalog properties.
private byte[] propLock = new byte[0];
private Map<String, String> convertedProperties = null;
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected PreExecutionAuthenticator preExecutionAuthenticator;
protected ThreadPoolExecutor threadPoolWithPreAuth;
private volatile Configuration cachedConf = null;
private byte[] confLock = new byte[0];
public ExternalCatalog() {
}
public ExternalCatalog(long catalogId, String name, InitCatalogLog.Type logType, String comment) {
this.id = catalogId;
this.name = name;
this.logType = logType;
this.comment = Strings.nullToEmpty(comment);
}
/**
* Initializes the PreExecutionAuthenticator instance.
* This method ensures that the authenticator is created only once in a thread-safe manner.
* If additional authentication logic is required, it should be extended and implemented in subclasses.
*/
protected synchronized void initPreExecutionAuthenticator() {
if (preExecutionAuthenticator == null) {
preExecutionAuthenticator = new PreExecutionAuthenticator();
}
}
public Configuration getConfiguration() {
// build configuration is costly, so we cache it.
if (cachedConf != null) {
return cachedConf;
}
synchronized (confLock) {
if (cachedConf != null) {
return cachedConf;
}
cachedConf = buildConf();
return cachedConf;
}
}
private Configuration buildConf() {
Configuration conf = DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
return conf;
}
/**
* set some default properties when creating catalog
*
* @return list of database names in this catalog
*/
protected List<String> listDatabaseNames() {
if (metadataOps == null) {
throw new UnsupportedOperationException("Unsupported operation: "
+ "listDatabaseNames from remote client when init catalog with " + logType.name());
} else {
return metadataOps.listDatabaseNames();
}
}
public ExternalMetadataOps getMetadataOps() {
makeSureInitialized();
return metadataOps;
}
// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
if (catalogProperty.getOrDefault(USE_META_CACHE, "").isEmpty()) {
// If not setting USE_META_CACHE in replay logic,
// set default value to false to be compatible with older version meta data.
catalogProperty.addProperty(USE_META_CACHE, isReplay ? "false" : String.valueOf(DEFAULT_USE_META_CACHE));
}
useMetaCache = Optional.of(
Boolean.valueOf(catalogProperty.getOrDefault(USE_META_CACHE, String.valueOf(DEFAULT_USE_META_CACHE))));
}
// we need check auth fallback for kerberos or simple
public boolean ifNotSetFallbackToSimpleAuth() {
return catalogProperty.getOrDefault(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "").isEmpty();
}
// Will be called when creating catalog(not replaying).
// Subclass can override this method to do some check when creating catalog.
public void checkWhenCreating() throws DdlException {
}
/**
* @param dbName
* @return names of tables in specified database
*/
public abstract List<String> listTableNames(SessionContext ctx, String dbName);
/**
* check if the specified table exist.
*
* @param dbName
* @param tblName
* @return true if table exists, false otherwise
*/
public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName);
/**
* init some local objects such as:
* hms client, read properties from hive-site.xml, es client
*/
protected abstract void initLocalObjectsImpl();
/**
* check if the specified table exist in doris.
* Currently only be used for hms event handler.
*
* @param dbName
* @param tblName
* @return true if table exists, false otherwise
*/
public boolean tableExistInLocal(String dbName, String tblName) {
throw new NotImplementedException("tableExistInLocal not implemented");
}
/**
* Catalog can't be init when creating because the external catalog may depend on third system.
* So you have to make sure the client of third system is initialized before any method was called.
*/
public final synchronized void makeSureInitialized() {
initLocalObjects();
if (!initialized) {
if (useMetaCache.get()) {
if (metaCache == null) {
metaCache = Env.getCurrentEnv().getExtMetaCacheMgr().buildMetaCache(
name,
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_meta_object_cache_num,
ignored -> getFilteredDatabaseNames(),
localDbName -> Optional.ofNullable(
buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType,
true)),
(key, value, cause) -> value.ifPresent(v -> v.setUnInitialized(invalidCacheInInit)));
}
setLastUpdateTime(System.currentTimeMillis());
} else {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(id, -1);
} catch (Exception e) {
Util.logAndThrowRuntimeException(LOG,
String.format("failed to forward init catalog %s operation to master.", name), e);
}
return;
}
init();
}
initialized = true;
}
}
protected final void initLocalObjects() {
if (!objectCreated) {
initLocalObjectsImpl();
objectCreated = true;
}
}
public boolean isInitialized() {
return this.initialized;
}
// check if all required properties are set when creating catalog
public void checkProperties() throws DdlException {
// check refresh parameter of catalog
Map<String, String> properties = getCatalogProperty().getProperties();
if (properties.containsKey(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC)) {
try {
int metadataRefreshIntervalSec = Integer.parseInt(
properties.get(CatalogMgr.METADATA_REFRESH_INTERVAL_SEC));
if (metadataRefreshIntervalSec < 0) {
throw new DdlException("Invalid properties: " + CatalogMgr.METADATA_REFRESH_INTERVAL_SEC);
}
} catch (NumberFormatException e) {
throw new DdlException("Invalid properties: " + CatalogMgr.METADATA_REFRESH_INTERVAL_SEC);
}
}
// check schema.cache.ttl-second parameter
String schemaCacheTtlSecond = catalogProperty.getOrDefault(SCHEMA_CACHE_TTL_SECOND, null);
if (java.util.Objects.nonNull(schemaCacheTtlSecond) && NumberUtils.toInt(schemaCacheTtlSecond, CACHE_NO_TTL)
< CACHE_TTL_DISABLE_CACHE) {
throw new DdlException(
"The parameter " + SCHEMA_CACHE_TTL_SECOND + " is wrong, value is " + schemaCacheTtlSecond);
}
}
/**
* eg:
* (
* ""access_controller.class" = "org.apache.doris.mysql.privilege.RangerHiveAccessControllerFactory",
* "access_controller.properties.prop1" = "xxx",
* "access_controller.properties.prop2" = "yyy",
* )
* <p>
* isDryRun: if true, it will try to create the custom access controller, but will not add it to the access manager.
*/
public void initAccessController(boolean isDryRun) {
Map<String, String> properties = getCatalogProperty().getProperties();
// 1. get access controller class name
String className = properties.getOrDefault(CatalogMgr.ACCESS_CONTROLLER_CLASS_PROP, "");
if (Strings.isNullOrEmpty(className)) {
// not set access controller, use internal access controller
return;
}
// 2. get access controller properties
Map<String, String> acProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(CatalogMgr.ACCESS_CONTROLLER_PROPERTY_PREFIX_PROP)) {
acProperties.put(
StringUtils.removeStart(entry.getKey(), CatalogMgr.ACCESS_CONTROLLER_PROPERTY_PREFIX_PROP),
entry.getValue());
}
}
// 3. create access controller
Env.getCurrentEnv().getAccessManager().createAccessController(name, className, acProperties, isDryRun);
}
// init schema related objects
private void init() {
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
InitCatalogLog initCatalogLog = new InitCatalogLog();
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(logType);
List<Pair<String, String>> remoteToLocalPairs = getFilteredDatabaseNames();
for (Pair<String, String> pair : remoteToLocalPairs) {
String remoteDbName = pair.key();
String localDbName = pair.value();
long dbId;
if (dbNameToId != null && dbNameToId.containsKey(localDbName)) {
dbId = dbNameToId.get(localDbName);
tmpDbNameToId.put(localDbName, dbId);
ExternalDatabase<? extends ExternalTable> db = idToDb.get(dbId);
// If the remote name is missing during upgrade, all databases in the Map will be reinitialized.
if (Strings.isNullOrEmpty(db.getRemoteName())) {
db.setRemoteName(remoteDbName);
}
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId, remoteDbName);
} else {
dbId = Env.getCurrentEnv().getNextId();
tmpDbNameToId.put(localDbName, dbId);
ExternalDatabase<? extends ExternalTable> db =
buildDbForInit(remoteDbName, localDbName, dbId, logType, false);
tmpIdToDb.put(dbId, db);
initCatalogLog.addCreateDb(dbId, localDbName, remoteDbName);
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
lastUpdateTime = System.currentTimeMillis();
initCatalogLog.setLastUpdateTime(lastUpdateTime);
Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
}
/**
* Retrieves a filtered list of database names and their corresponding local database names.
* The method applies to include and exclude filters based on the database properties, ensuring
* only the relevant databases are included for further operations.
* <p>
* The method also handles conflicts in database names under case-insensitive conditions
* and throws an exception if such conflicts are detected.
* <p>
* Steps:
* 1. Fetch all database names from the remote source.
* 2. Apply to include and exclude database filters:
* - Exclude filters take precedence over include filters.
* - If a database is in the exclude list, it is ignored.
* - If a database is not in the include list and the include list is not empty, it is ignored.
* 3. Map the filtered remote database names to local database names.
* 4. Handle conflicts when `lower_case_meta_names` is enabled:
* - Detect cases where multiple remote database names map to the same lower-cased local name.
* - Throw an exception if conflicts are found.
*
* @return A list of pairs where each pair contains the remote database name and local database name.
* @throws RuntimeException if there are conflicting database names under case-insensitive conditions.
*/
@NotNull
private List<Pair<String, String>> getFilteredDatabaseNames() {
List<String> allDatabases = Lists.newArrayList(listDatabaseNames());
allDatabases.remove(InfoSchemaDb.DATABASE_NAME);
allDatabases.add(InfoSchemaDb.DATABASE_NAME);
allDatabases.remove(MysqlDb.DATABASE_NAME);
allDatabases.add(MysqlDb.DATABASE_NAME);
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
List<Pair<String, String>> remoteToLocalPairs = Lists.newArrayList();
allDatabases = allDatabases.stream().filter(dbName -> {
if (!dbName.equals(InfoSchemaDb.DATABASE_NAME) && !dbName.equals(MysqlDb.DATABASE_NAME)) {
// Exclude database map take effect with higher priority over include database map
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
return false;
}
if (!includeDatabaseMap.isEmpty() && !includeDatabaseMap.containsKey(dbName)) {
return false;
}
}
return true;
}).collect(Collectors.toList());
for (String remoteDbName : allDatabases) {
String localDbName = fromRemoteDatabaseName(remoteDbName);
remoteToLocalPairs.add(Pair.of(remoteDbName, localDbName));
}
// Check for conflicts when lower_case_meta_names = true
if (Boolean.parseBoolean(getLowerCaseMetaNames())) {
// Map to track lowercase local names and their corresponding remote names
Map<String, List<String>> lowerCaseToRemoteNames = Maps.newHashMap();
// Collect lowercased local names and their remote counterparts
for (Pair<String, String> pair : remoteToLocalPairs) {
String lowerCaseLocalName = pair.second.toLowerCase();
lowerCaseToRemoteNames.computeIfAbsent(lowerCaseLocalName, k -> Lists.newArrayList()).add(pair.first);
}
// Identify conflicts: multiple remote names mapping to the same lowercase local name
List<String> conflicts = lowerCaseToRemoteNames.values().stream()
.filter(remoteNames -> remoteNames.size() > 1) // Conflict: more than one remote name
.flatMap(List::stream) // Collect all conflicting remote names
.collect(Collectors.toList());
// Throw exception if conflicts are found
if (!conflicts.isEmpty()) {
throw new RuntimeException(String.format(
FOUND_CONFLICTING + " database names under case-insensitive conditions. "
+ "Conflicting remote database names: %s in catalog %s. "
+ "Please use meta_names_mapping to handle name mapping.",
String.join(", ", conflicts), name));
}
}
return remoteToLocalPairs;
}
/**
* Resets the Catalog state to uninitialized, releases resources held by {@code initLocalObjectsImpl()}
* <p>
* This method is typically invoked during operations such as {@code CREATE CATALOG}
* and {@code MODIFY CATALOG}. It marks the object as uninitialized, clears cached
* configurations, and ensures that resources allocated during {@link #initLocalObjectsImpl()}
* are properly released via {@link #onClose()}
* </p>
* <p>
* The {@code onClose()} method is responsible for cleaning up resources that were initialized
* in {@code initLocalObjectsImpl()}, preventing potential resource leaks.
* </p>
*
* @param invalidCache if {@code true}, the catalog cache will be invalidated
* and reloaded during the refresh process.
*/
public void resetToUninitialized(boolean invalidCache) {
this.objectCreated = false;
this.initialized = false;
synchronized (this.propLock) {
this.convertedProperties = null;
}
synchronized (this.confLock) {
this.cachedConf = null;
}
onClose();
refreshOnlyCatalogCache(invalidCache);
}
public void onRefreshCache(boolean invalidCache) {
refreshOnlyCatalogCache(invalidCache);
}
private void refreshOnlyCatalogCache(boolean invalidCache) {
if (useMetaCache.isPresent()) {
if (useMetaCache.get() && metaCache != null) {
metaCache.invalidateAll();
} else if (!useMetaCache.get()) {
this.initialized = false;
for (ExternalDatabase<? extends ExternalTable> db : idToDb.values()) {
db.setUnInitialized(invalidCache);
}
}
}
this.invalidCacheInInit = invalidCache;
if (invalidCache) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
}
}
public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
makeSureInitialized();
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(key.getDbName());
if (db.isPresent()) {
Optional<? extends ExternalTable> table = db.get().getTable(key.getTblName());
if (table.isPresent()) {
return table.get().initSchemaAndUpdateTime(key);
}
}
return Optional.empty();
}
@Override
public long getId() {
return id;
}
@Override
public String getName() {
return name;
}
@Override
public String getType() {
return logType.name().toLowerCase(Locale.ROOT);
}
@Override
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
/**
* Different from 'listDatabases()', this method will return dbnames from cache.
* while 'listDatabases()' will return dbnames from remote datasource.
*
* @return names of database in this catalog.
*/
@Override
public List<String> getDbNames() {
makeSureInitialized();
if (useMetaCache.get()) {
return metaCache.listNames();
} else {
return new ArrayList<>(dbNameToId.keySet());
}
}
@Override
public List<String> getDbNamesOrEmpty() {
if (initialized) {
try {
return getDbNames();
} catch (Exception e) {
LOG.warn("failed to get db names in catalog {}", getName(), e);
return Lists.newArrayList();
}
} else {
return Lists.newArrayList();
}
}
@Override
public TableName getTableNameByTableId(Long tableId) {
throw new UnsupportedOperationException("External catalog does not support getTableNameByTableId() method."
+ ", table id: " + tableId);
}
@Override
public String getResource() {
return catalogProperty.getResource();
}
@Nullable
@Override
public ExternalDatabase<? extends ExternalTable> getDbNullable(String dbName) {
if (StringUtils.isEmpty(dbName)) {
return null;
}
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("failed to get db {} in catalog {}", dbName, name, e);
return null;
}
String realDbName = ClusterNamespace.getNameFromFullName(dbName);
// information_schema db name is case-insensitive.
// So, we first convert it to standard database name.
if (realDbName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
realDbName = InfoSchemaDb.DATABASE_NAME;
} else if (realDbName.equalsIgnoreCase(MysqlDb.DATABASE_NAME)) {
realDbName = MysqlDb.DATABASE_NAME;
}
if (useMetaCache.get()) {
// must use full qualified name to generate id.
// otherwise, if 2 catalogs have the same db name, the id will be the same.
return metaCache.getMetaObj(realDbName, Util.genIdByName(name, realDbName)).orElse(null);
} else {
if (dbNameToId.containsKey(realDbName)) {
return idToDb.get(dbNameToId.get(realDbName));
}
return null;
}
}
@Nullable
@Override
public ExternalDatabase<? extends ExternalTable> getDbNullable(long dbId) {
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("failed to get db {} in catalog {}", dbId, name, e);
return null;
}
if (useMetaCache.get()) {
return metaCache.getMetaObjById(dbId).orElse(null);
} else {
return idToDb.get(dbId);
}
}
@Override
public List<Long> getDbIds() {
makeSureInitialized();
if (useMetaCache.get()) {
return getAllDbs().stream().map(DatabaseIf::getId).collect(Collectors.toList());
} else {
return Lists.newArrayList(dbNameToId.values());
}
}
@Override
public Map<String, String> getProperties() {
// convert properties may be a heavy operation, so we cache the result.
if (convertedProperties != null) {
return convertedProperties;
}
synchronized (propLock) {
if (convertedProperties != null) {
return convertedProperties;
}
convertedProperties = PropertyConverter.convertToMetaProperties(catalogProperty.getProperties());
return convertedProperties;
}
}
@Override
public void modifyCatalogName(String name) {
this.name = name;
}
@Override
public void modifyCatalogProps(Map<String, String> props) {
catalogProperty.modifyCatalogProps(props);
notifyPropertiesUpdated(props);
}
public void tryModifyCatalogProps(Map<String, String> props) {
catalogProperty.modifyCatalogProps(props);
}
public void rollBackCatalogProps(Map<String, String> props) {
catalogProperty.rollBackCatalogProps(props);
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
public void setLastUpdateTime(long lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
@Override
public void onClose() {
removeAccessController();
if (threadPoolWithPreAuth != null) {
ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
}
if (null != preExecutionAuthenticator) {
preExecutionAuthenticator = null;
}
if (null != transactionManager) {
transactionManager = null;
}
}
private void removeAccessController() {
Env.getCurrentEnv().getAccessManager().removeAccessController(name);
}
public void replayInitCatalog(InitCatalogLog log) {
// If the remote name is missing during upgrade, or
// the refresh db's remote name is empty,
// all databases in the Map will be reinitialized.
if ((log.getCreateCount() > 0 && (log.getRemoteDbNames() == null || log.getRemoteDbNames().isEmpty()))
|| (log.getRefreshCount() > 0
&& (log.getRefreshRemoteDbNames() == null || log.getRefreshRemoteDbNames().isEmpty()))) {
dbNameToId = Maps.newConcurrentMap();
idToDb = Maps.newConcurrentMap();
lastUpdateTime = log.getLastUpdateTime();
initialized = false;
return;
}
Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
Map<Long, ExternalDatabase<? extends ExternalTable>> tmpIdToDb = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
Optional<ExternalDatabase<? extends ExternalTable>> db = getDbForReplay(log.getRefreshDbIds().get(i));
// Should not return null.
// Because replyInitCatalog can only be called when `use_meta_cache` is false.
// And if `use_meta_cache` is false, getDbForReplay() will not return null
if (!db.isPresent()) {
LOG.warn("met invalid db id {} in replayInitCatalog, catalog: {}, ignore it to skip bug.",
log.getRefreshDbIds().get(i), name);
continue;
}
db.get().setRemoteName(log.getRefreshRemoteDbNames().get(i));
Preconditions.checkNotNull(db.get());
tmpDbNameToId.put(db.get().getFullName(), db.get().getId());
tmpIdToDb.put(db.get().getId(), db.get());
LOG.info("Synchronized database (refresh): [Name: {}, ID: {}]", db.get().getFullName(), db.get().getId());
}
for (int i = 0; i < log.getCreateCount(); i++) {
ExternalDatabase<? extends ExternalTable> db =
buildDbForInit(log.getRemoteDbNames().get(i), log.getCreateDbNames().get(i),
log.getCreateDbIds().get(i), log.getType(), false);
if (db != null) {
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
LOG.info("Synchronized database (create): [Name: {}, ID: {}, Remote Name: {}]",
db.getFullName(), db.getId(), log.getRemoteDbNames().get(i));
}
}
// Check whether the remoteName of db in tmpIdToDb is empty
for (ExternalDatabase<? extends ExternalTable> db : tmpIdToDb.values()) {
if (Strings.isNullOrEmpty(db.getRemoteName())) {
LOG.info("Database [{}] remoteName is empty in catalog [{}], mark as uninitialized",
db.getFullName(), name);
dbNameToId = Maps.newConcurrentMap();
idToDb = Maps.newConcurrentMap();
lastUpdateTime = log.getLastUpdateTime();
initialized = false;
return;
}
}
dbNameToId = tmpDbNameToId;
idToDb = tmpIdToDb;
lastUpdateTime = log.getLastUpdateTime();
initialized = true;
}
public Optional<ExternalDatabase<? extends ExternalTable>> getDbForReplay(long dbId) {
if (useMetaCache.get()) {
if (!isInitialized()) {
return Optional.empty();
}
return metaCache.getMetaObjById(dbId);
} else {
return Optional.ofNullable(idToDb.get(dbId));
}
}
/**
* Build a database instance.
* If checkExists is true, it will check if the database exists in the remote system.
*
* @param remoteDbName
* @param dbId
* @param logType
* @param checkExists
* @return
*/
protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String remoteDbName, String localDbName,
long dbId, InitCatalogLog.Type logType, boolean checkExists) {
// Step 1: Map local database name if not already provided
if (localDbName == null && remoteDbName != null) {
localDbName = fromRemoteDatabaseName(remoteDbName);
}
// Step 2:
// When running ut, disable this check to make ut pass.
// Because in ut, the database is not created in remote system.
if (checkExists && (!FeConstants.runningUnitTest || this instanceof TestExternalCatalog)) {
try {
List<String> dbNames = getDbNames();
if (!dbNames.contains(localDbName)) {
dbNames = getFilteredDatabaseNames().stream()
.map(Pair::value)
.collect(Collectors.toList());
if (!dbNames.contains(localDbName)) {
LOG.warn("Database {} does not exist in the remote system. Skipping initialization.",
localDbName);
return null;
}
}
} catch (RuntimeException e) {
// Handle "Found conflicting" exception explicitly
if (e.getMessage().contains(FOUND_CONFLICTING)) {
LOG.error(e.getMessage());
throw e; // Rethrow to let the caller handle this critical issue
} else {
// Any errors other than name conflicts, we default to not finding the database
LOG.warn("Failed to check db {} exist in remote system, ignore it.", localDbName, e);
return null;
}
} catch (Exception e) {
// If connection failed, it will throw exception.
// ignore it and treat it as not exist.
LOG.warn("Failed to check db {} exist in remote system, ignore it.", localDbName, e);
return null;
}
}
// Step 3: Resolve remote database name if using meta cache
if (remoteDbName == null && useMetaCache.orElse(false)) {
if (Boolean.parseBoolean(getLowerCaseMetaNames()) || !Strings.isNullOrEmpty(getMetaNamesMapping())) {
remoteDbName = metaCache.getRemoteName(localDbName);
if (remoteDbName == null) {
LOG.warn("Could not resolve remote database name for local database: {}", localDbName);
return null;
}
} else {
remoteDbName = localDbName;
}
}
// Step 4: Instantiate the appropriate ExternalDatabase based on logType
if (localDbName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME)) {
return new ExternalInfoSchemaDatabase(this, dbId);
}
if (localDbName.equalsIgnoreCase(MysqlDb.DATABASE_NAME)) {
return new ExternalMysqlDatabase(this, dbId);
}
switch (logType) {
case HMS:
return new HMSExternalDatabase(this, dbId, localDbName, remoteDbName);
case ES:
return new EsExternalDatabase(this, dbId, localDbName, remoteDbName);
case JDBC:
return new JdbcExternalDatabase(this, dbId, localDbName, remoteDbName);
case ICEBERG:
return new IcebergExternalDatabase(this, dbId, localDbName, remoteDbName);
case MAX_COMPUTE:
return new MaxComputeExternalDatabase(this, dbId, localDbName, remoteDbName);
case LAKESOUL:
return new LakeSoulExternalDatabase(this, dbId, localDbName, remoteDbName);
case TEST:
return new TestExternalDatabase(this, dbId, localDbName, remoteDbName);
case PAIMON:
return new PaimonExternalDatabase(this, dbId, localDbName, remoteDbName);
case TRINO_CONNECTOR:
return new TrinoConnectorExternalDatabase(this, dbId, localDbName, remoteDbName);
default:
break;
}
return null;
}
public static ExternalCatalog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ExternalCatalog.class);
}
@Override
public void gsonPostProcess() throws IOException {
if (idToDb == null) {
// ExternalCatalog is loaded from meta with older version
idToDb = Maps.newConcurrentMap();
}
dbNameToId = Maps.newConcurrentMap();
for (ExternalDatabase<? extends ExternalTable> db : idToDb.values()) {
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
db.setExtCatalog(this);
db.setTableExtCatalog(this);
}
objectCreated = false;
// TODO: This code is to compatible with older version of metadata.
// Could only remove after all users upgrate to the new version.
if (logType == null) {
if (type == null) {
logType = InitCatalogLog.Type.UNKNOWN;
} else {
try {
logType = InitCatalogLog.Type.valueOf(type.toUpperCase(Locale.ROOT));
} catch (Exception e) {
logType = InitCatalogLog.Type.UNKNOWN;
}
}
}
this.propLock = new byte[0];
this.confLock = new byte[0];
this.initialized = false;
setDefaultPropsIfMissing(true);
if (tableAutoAnalyzePolicy == null) {
tableAutoAnalyzePolicy = Maps.newHashMap();
}
}
public void addDatabaseForTest(ExternalDatabase<? extends ExternalTable> db) {
idToDb.put(db.getId(), db);
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}
@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("createDb not implemented");
return;
}
try {
metadataOps.createDb(stmt);
CreateDbInfo info = new CreateDbInfo(getName(), stmt.getFullDbName(), null);
Env.getCurrentEnv().getEditLog().logCreateDb(info);
} catch (Exception e) {
LOG.warn("Failed to create database {} in catalog {}.", stmt.getFullDbName(), getName(), e);
throw e;
}
}
@Override
public void createDb(CreateDatabaseCommand command) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("createDb not implemented");
return;
}
try {
metadataOps.createDb(command);
CreateDbInfo info = new CreateDbInfo(getName(), command.getDbName(), null);
Env.getCurrentEnv().getEditLog().logCreateDb(info);
} catch (Exception e) {
LOG.warn("Failed to create database {} in catalog {}.", command.getDbName(), getName(), e);
throw e;
}
}
public void replayCreateDb(String dbName) {
if (metadataOps != null) {
metadataOps.afterCreateDb(dbName);
}
}
@Override
public void dropDb(String dbName, boolean ifExists, boolean force) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("dropDb not implemented");
return;
}
try {
metadataOps.dropDb(getName(), dbName, ifExists, force);
DropDbInfo info = new DropDbInfo(getName(), dbName);
Env.getCurrentEnv().getEditLog().logDropDb(info);
} catch (Exception e) {
LOG.warn("Failed to drop database {} in catalog {}", dbName, getName(), e);
throw e;
}
}
public void replayDropDb(String dbName) {
if (metadataOps != null) {
metadataOps.afterDropDb(dbName);
}
}
@Override
public boolean createTable(CreateTableStmt stmt) throws UserException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("createTable not implemented");
return false;
}
try {
boolean res = metadataOps.createTable(stmt);
if (!res) {
// res == false means the table does not exist before, and we create it.
CreateTableInfo info = new CreateTableInfo(getName(), stmt.getDbName(), stmt.getTableName());
Env.getCurrentEnv().getEditLog().logCreateTable(info);
}
return res;
} catch (Exception e) {
LOG.warn("Failed to create a table.", e);
throw e;
}
}
public void replayCreateTable(String dbName, String tblName) {
if (metadataOps != null) {
metadataOps.afterCreateTable(dbName, tblName);
}
}
@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
if (stmt == null) {
throw new DdlException("DropTableStmt is null");
}
dropTable(stmt.getDbName(), stmt.getTableName(), stmt.isView(), stmt.isMaterializedView(), stmt.isSetIfExists(),
stmt.isForceDrop());
}
@Override
public void dropTable(String dbName, String tableName, boolean isView, boolean isMtmv, boolean ifExists,
boolean force) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("dropTable not implemented");
return;
}
try {
metadataOps.dropTable(dbName, tableName, ifExists);
DropInfo info = new DropInfo(getName(), dbName, tableName);
Env.getCurrentEnv().getEditLog().logDropTable(info);
} catch (Exception e) {
LOG.warn("Failed to drop a table", e);
throw e;
}
}
public void replayDropTable(String dbName, String tblName) {
if (metadataOps != null) {
metadataOps.afterDropTable(dbName, tblName);
}
}
public void unregisterDatabase(String dbName) {
throw new NotImplementedException("unregisterDatabase not implemented");
}
public void registerDatabase(long dbId, String dbName) {
throw new NotImplementedException("registerDatabase not implemented");
}
protected Map<String, Boolean> getIncludeDatabaseMap() {
return getSpecifiedDatabaseMap(Resource.INCLUDE_DATABASE_LIST);
}
protected Map<String, Boolean> getExcludeDatabaseMap() {
return getSpecifiedDatabaseMap(Resource.EXCLUDE_DATABASE_LIST);
}
private Map<String, Boolean> getSpecifiedDatabaseMap(String catalogPropertyKey) {
String specifiedDatabaseList = catalogProperty.getOrDefault(catalogPropertyKey, "");
Map<String, Boolean> specifiedDatabaseMap = Maps.newHashMap();
specifiedDatabaseList = specifiedDatabaseList.trim();
if (specifiedDatabaseList.isEmpty()) {
return specifiedDatabaseMap;
}
String[] databaseList = specifiedDatabaseList.split(",");
for (String database : databaseList) {
String dbname = database.trim();
if (!dbname.isEmpty()) {
specifiedDatabaseMap.put(dbname, true);
}
}
return specifiedDatabaseMap;
}
public String getLowerCaseMetaNames() {
return catalogProperty.getOrDefault(Resource.LOWER_CASE_META_NAMES, "false");
}
public int getOnlyTestLowerCaseTableNames() {
return Integer.parseInt(catalogProperty.getOrDefault(ONLY_TEST_LOWER_CASE_TABLE_NAMES, "0"));
}
public String getMetaNamesMapping() {
return catalogProperty.getOrDefault(Resource.META_NAMES_MAPPING, "");
}
public String bindBrokerName() {
return catalogProperty.getProperties().get(HMSExternalCatalog.BIND_BROKER_NAME);
}
// ATTN: this method only return all cached databases.
// will not visit remote datasource's metadata
@Override
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
makeSureInitialized();
if (useMetaCache.get()) {
Set<DatabaseIf<? extends TableIf>> dbs = Sets.newHashSet();
List<String> dbNames = getDbNames();
for (String dbName : dbNames) {
ExternalDatabase<? extends ExternalTable> db = getDbNullable(dbName);
if (db != null) {
dbs.add(db);
}
}
return dbs;
} else {
return new HashSet<>(idToDb.values());
}
}
@Override
public boolean enableAutoAnalyze() {
// By default, external catalog disables auto analyze, users could set catalog property to enable it:
// "enable.auto.analyze" = "true"
Map<String, String> properties = catalogProperty.getProperties();
boolean ret = false;
if (properties.containsKey(ENABLE_AUTO_ANALYZE)
&& properties.get(ENABLE_AUTO_ANALYZE).equalsIgnoreCase("true")) {
ret = true;
}
return ret;
}
@Override
public void truncateTable(TruncateTableStmt stmt) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
throw new UnsupportedOperationException("Truncate table not supported in " + getName());
}
try {
TableRef tableRef = stmt.getTblRef();
TableName tableName = tableRef.getName();
// delete all table data if null
List<String> partitions = null;
if (tableRef.getPartitionNames() != null) {
partitions = tableRef.getPartitionNames().getPartitionNames();
}
metadataOps.truncateTable(tableName.getDb(), tableName.getTbl(), partitions);
TruncateTableInfo info = new TruncateTableInfo(getName(), tableName.getDb(), tableName.getTbl(),
partitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (Exception e) {
LOG.warn("Failed to truncate table {}.{} in catalog {}", stmt.getTblRef().getName().getDb(),
stmt.getTblRef().getName().getTbl(), getName(), e);
throw e;
}
}
@Override
public void truncateTable(TruncateTableCommand command) throws DdlException {
makeSureInitialized();
if (metadataOps == null) {
throw new UnsupportedOperationException("Truncate table not supported in " + getName());
}
try {
String db = command.getTableNameInfo().getDb();
String tbl = command.getTableNameInfo().getTbl();
// delete all table data if null
List<String> partitions = null;
if (command.getPartitionNamesInfo().isPresent()) {
partitions = command.getPartitionNamesInfo().get().getPartitionNames();
}
metadataOps.truncateTable(db, tbl, partitions);
TruncateTableInfo info = new TruncateTableInfo(getName(), db, tbl, partitions);
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
} catch (Exception e) {
LOG.warn("Failed to truncate table {}.{} in catalog {}", command.getTableNameInfo().getDb(),
command.getTableNameInfo().getTbl(), getName(), e);
throw e;
}
}
public void replayTruncateTable(TruncateTableInfo info) {
if (metadataOps != null) {
metadataOps.afterTruncateTable(info.getDb(), info.getTable());
}
}
public void setAutoAnalyzePolicy(String dbName, String tableName, String policy) {
Pair<String, String> key = Pair.of(dbName, tableName);
if (policy == null) {
tableAutoAnalyzePolicy.remove(key);
} else {
tableAutoAnalyzePolicy.put(key, policy);
}
}
public PreExecutionAuthenticator getPreExecutionAuthenticator() {
if (null == preExecutionAuthenticator) {
throw new RuntimeException("PreExecutionAuthenticator is null, please confirm it is initialized.");
}
return preExecutionAuthenticator;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ExternalCatalog)) {
return false;
}
ExternalCatalog that = (ExternalCatalog) o;
return Objects.equal(name, that.name);
}
@Override
public int hashCode() {
return Objects.hashCode(name);
}
@Override
public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
CatalogIf.super.notifyPropertiesUpdated(updatedProps);
String schemaCacheTtl = updatedProps.getOrDefault(SCHEMA_CACHE_TTL_SECOND, null);
if (java.util.Objects.nonNull(schemaCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidSchemaCache(id);
}
}
}