HMSExternalCatalog.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.fsv2.FileSystemProvider;
import org.apache.doris.fsv2.FileSystemProviderImpl;
import org.apache.doris.fsv2.remote.dfs.DFSFileSystem;
import org.apache.doris.transaction.TransactionManagerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * External catalog for hive metastore compatible data sources.
 */
public class HMSExternalCatalog extends ExternalCatalog {
    private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);

    public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
    public static final String PARTITION_CACHE_TTL_SECOND = "partition.cache.ttl-second";
    // broker name for file split and query scan.
    public static final String BIND_BROKER_NAME = "broker.name";
    // Default is false, if set to true, will get table schema from "remoteTable" instead of from hive metastore.
    // This is because for some forward compatibility issue of hive metastore, there maybe
    // "storage schema reading not support" error being thrown.
    // set this to true can avoid this error.
    // But notice that if set to true, the default value of column will be ignored because we cannot get default value
    // from remoteTable object.
    public static final String GET_SCHEMA_FROM_TABLE = "get_schema_from_table";

    private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
    private ThreadPoolExecutor fileSystemExecutor;

    private int hmsEventsBatchSizePerRpc = -1;
    private boolean enableHmsEventsIncrementalSync = false;

    //for "type" = "hms" , but is iceberg table.
    private IcebergMetadataOps icebergMetadataOps;

    @VisibleForTesting
    public HMSExternalCatalog() {
        catalogProperty = new CatalogProperty(null, null);
    }

    /**
     * Default constructor for HMSExternalCatalog.
     */
    public HMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
                              String comment) {
        super(catalogId, name, InitCatalogLog.Type.HMS, comment);
        props = PropertyConverter.convertToMetaProperties(props);
        catalogProperty = new CatalogProperty(resource, props);
    }

    @Override
    public void checkProperties() throws DdlException {
        super.checkProperties();
        // check file.meta.cache.ttl-second parameter
        String fileMetaCacheTtlSecond = catalogProperty.getOrDefault(FILE_META_CACHE_TTL_SECOND, null);
        if (Objects.nonNull(fileMetaCacheTtlSecond) && NumberUtils.toInt(fileMetaCacheTtlSecond, CACHE_NO_TTL)
                < CACHE_TTL_DISABLE_CACHE) {
            throw new DdlException(
                    "The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond);
        }

        // check partition.cache.ttl-second parameter
        String partitionCacheTtlSecond = catalogProperty.getOrDefault(PARTITION_CACHE_TTL_SECOND, null);
        if (Objects.nonNull(partitionCacheTtlSecond) && NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL)
                < CACHE_TTL_DISABLE_CACHE) {
            throw new DdlException(
                    "The parameter " + PARTITION_CACHE_TTL_SECOND + " is wrong, value is " + partitionCacheTtlSecond);
        }

        // check the dfs.ha properties
        // 'dfs.nameservices'='your-nameservice',
        // 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
        // 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
        // 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
        // 'dfs.client.failover.proxy.provider.your-nameservice'='xxx'
        String dfsNameservices = catalogProperty.getOrDefault(HdfsResource.DSF_NAMESERVICES, "");
        if (Strings.isNullOrEmpty(dfsNameservices)) {
            return;
        }

        String[] nameservices = dfsNameservices.split(",");
        for (String dfsservice : nameservices) {
            String namenodes = catalogProperty.getOrDefault("dfs.ha.namenodes." + dfsservice, "");
            if (Strings.isNullOrEmpty(namenodes)) {
                throw new DdlException("Missing dfs.ha.namenodes." + dfsservice + " property");
            }
            String[] names = namenodes.split(",");
            for (String name : names) {
                String address = catalogProperty.getOrDefault("dfs.namenode.rpc-address." + dfsservice + "." + name,
                        "");
                if (Strings.isNullOrEmpty(address)) {
                    throw new DdlException(
                            "Missing dfs.namenode.rpc-address." + dfsservice + "." + name + " property");
                }
            }
            String failoverProvider = catalogProperty.getOrDefault("dfs.client.failover.proxy.provider." + dfsservice,
                    "");
            if (Strings.isNullOrEmpty(failoverProvider)) {
                throw new DdlException(
                        "Missing dfs.client.failover.proxy.provider." + dfsservice + " property");
            }
        }
    }

    @Override
    protected synchronized void initPreExecutionAuthenticator() {
        if (preExecutionAuthenticator == null) {
            preExecutionAuthenticator = new PreExecutionAuthenticator(getConfiguration());
        }
    }

    @Override
    protected void initLocalObjectsImpl() {
        initPreExecutionAuthenticator();
        HiveConf hiveConf = null;
        JdbcClientConfig jdbcClientConfig = null;
        String hiveMetastoreType = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_TYPE, "");
        if (hiveMetastoreType.equalsIgnoreCase("jdbc")) {
            jdbcClientConfig = new JdbcClientConfig();
            jdbcClientConfig.setUser(catalogProperty.getOrDefault("user", ""));
            jdbcClientConfig.setPassword(catalogProperty.getOrDefault("password", ""));
            jdbcClientConfig.setJdbcUrl(catalogProperty.getOrDefault("jdbc_url", ""));
            jdbcClientConfig.setDriverUrl(catalogProperty.getOrDefault("driver_url", ""));
            jdbcClientConfig.setDriverClass(catalogProperty.getOrDefault("driver_class", ""));
        } else {
            hiveConf = new HiveConf();
            for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
                hiveConf.set(kv.getKey(), kv.getValue());
            }
            HiveConf.setVar(hiveConf, HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT,
                    String.valueOf(Config.hive_metastore_client_timeout_second));
        }
        HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
        threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
                ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
                Integer.MAX_VALUE,
                String.format("hms_iceberg_catalog_%s_executor_pool", name),
                true,
                preExecutionAuthenticator);
        FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
                this.bindBrokerName(), this.catalogProperty.getHadoopProperties());
        this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
                Integer.MAX_VALUE, String.format("hms_committer_%s_file_system_executor_pool", name), true);
        transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps, fileSystemProvider,
                fileSystemExecutor);
        metadataOps = hiveOps;
    }

    @Override
    public void resetToUninitialized(boolean invalidCache) {
        super.resetToUninitialized(invalidCache);
        if (metadataOps != null) {
            metadataOps.close();
        }
    }

    @Override
    public void onClose() {
        super.onClose();
        if (null != fileSystemExecutor) {
            ThreadPoolManager.shutdownExecutorService(fileSystemExecutor);
        }
        if (null != icebergMetadataOps) {
            icebergMetadataOps.close();
        }
    }

    @Override
    public List<String> listTableNames(SessionContext ctx, String dbName) {
        makeSureInitialized();
        return metadataOps.listTableNames(ClusterNamespace.getNameFromFullName(dbName));
    }

    @Override
    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
        return metadataOps.tableExist(ClusterNamespace.getNameFromFullName(dbName), tblName);
    }

    @Override
    public boolean tableExistInLocal(String dbName, String tblName) {
        makeSureInitialized();
        HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) getDbNullable(dbName);
        if (hmsExternalDatabase == null) {
            return false;
        }
        return hmsExternalDatabase.getTable(ClusterNamespace.getNameFromFullName(tblName)).isPresent();
    }

    public HMSCachedClient getClient() {
        makeSureInitialized();
        return ((HiveMetadataOps) metadataOps).getClient();
    }

    @Override
    public void unregisterDatabase(String dbName) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("drop database [{}]", dbName);
        }
        if (useMetaCache.get()) {
            if (isInitialized()) {
                metaCache.invalidate(dbName, Util.genIdByName(name, dbName));
            }
        } else {
            Long dbId = dbNameToId.remove(dbName);
            if (dbId == null) {
                LOG.warn("drop database [{}] failed", dbName);
            }
            idToDb.remove(dbId);
        }
        Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(getId(), dbName);
    }

    @Override
    public void registerDatabase(long dbId, String dbName) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("create database [{}]", dbName);
        }

        ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName, null, dbId, logType, false);
        if (useMetaCache.get()) {
            if (isInitialized()) {
                metaCache.updateCache(db.getRemoteName(), db.getFullName(), db,
                        Util.genIdByName(name, db.getFullName()));
            }
        } else {
            dbNameToId.put(dbName, dbId);
            idToDb.put(dbId, db);
        }
    }

    @Override
    public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
        super.notifyPropertiesUpdated(updatedProps);
        String fileMetaCacheTtl = updatedProps.getOrDefault(FILE_META_CACHE_TTL_SECOND, null);
        String partitionCacheTtl = updatedProps.getOrDefault(PARTITION_CACHE_TTL_SECOND, null);
        if (Objects.nonNull(fileMetaCacheTtl) || Objects.nonNull(partitionCacheTtl)) {
            Env.getCurrentEnv().getExtMetaCacheMgr().getMetaStoreCache(this).init();
        }
    }

    @Override
    public void setDefaultPropsIfMissing(boolean isReplay) {
        super.setDefaultPropsIfMissing(isReplay);
        if (ifNotSetFallbackToSimpleAuth()) {
            // always allow fallback to simple auth, so to support both kerberos and simple auth
            catalogProperty.addProperty(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true");
        }

        Map<String, String> properties = catalogProperty.getProperties();
        if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
            enableHmsEventsIncrementalSync =
                    properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
        } else {
            enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync;
        }

        if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
            hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
        } else {
            hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
        }
    }

    public String getHiveMetastoreUris() {
        return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
    }

    public String getHiveVersion() {
        return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
    }

    public int getHmsEventsBatchSizePerRpc() {
        return hmsEventsBatchSizePerRpc;
    }

    public boolean isEnableHmsEventsIncrementalSync() {
        return enableHmsEventsIncrementalSync;
    }

    public IcebergMetadataOps getIcebergMetadataOps() {
        makeSureInitialized();
        if (icebergMetadataOps == null) {
            HiveCatalog icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
            icebergMetadataOps = ExternalMetadataOperations.newIcebergMetadataOps(this, icebergHiveCatalog);
        }
        return icebergMetadataOps;
    }
}