HiveMetadataOps.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
import org.apache.doris.qe.ConnectContext;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

public class HiveMetadataOps implements ExternalMetadataOps {
    private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class);

    public static final String LOCATION_URI_KEY = "location";
    public static final String FILE_FORMAT_KEY = "file_format";
    public static final Set<String> DORIS_HIVE_KEYS = ImmutableSet.of(FILE_FORMAT_KEY, LOCATION_URI_KEY);
    private static final int MIN_CLIENT_POOL_SIZE = 8;
    private final HMSCachedClient client;
    private final HMSExternalCatalog catalog;
    private HadoopAuthenticator hadoopAuthenticator;

    public HiveMetadataOps(HiveConf hiveConf, HMSExternalCatalog catalog) {
        this(catalog, createCachedClient(hiveConf,
                Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size)));
        hadoopAuthenticator = catalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
        client.setHadoopAuthenticator(hadoopAuthenticator);
    }

    @VisibleForTesting
    public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) {
        this.catalog = catalog;
        this.client = client;
    }

    public HMSCachedClient getClient() {
        return client;
    }

    public HMSExternalCatalog getCatalog() {
        return catalog;
    }

    private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize) {
        Preconditions.checkNotNull(hiveConf, "HiveConf cannot be null");
        return  new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
    }

    @Override
    public boolean createDbImpl(String dbName, boolean ifNotExists, Map<String, String> properties)
            throws DdlException {
        ExternalDatabase dorisDb = catalog.getDbNullable(dbName);
        boolean exists = databaseExist(dbName);
        if (dorisDb != null || exists) {
            if (ifNotExists) {
                LOG.info("create database[{}] which already exists", dbName);
                return true;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName);
            }
        }
        try {
            HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata();
            catalogDatabase.setDbName(dbName);
            if (properties.containsKey(LOCATION_URI_KEY)) {
                catalogDatabase.setLocationUri(properties.get(LOCATION_URI_KEY));
            }
            // remove it when set
            properties.remove(LOCATION_URI_KEY);
            catalogDatabase.setProperties(properties);
            catalogDatabase.setComment(properties.getOrDefault("comment", ""));
            client.createDatabase(catalogDatabase);
            LOG.info("successfully create hive database: {}", dbName);
            return false;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void afterCreateDb() {
        catalog.resetMetaCacheNames();
    }

    @Override
    public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException {
        ExternalDatabase dorisDb = catalog.getDbNullable(dbName);
        if (dorisDb == null) {
            if (ifExists) {
                LOG.info("drop database[{}] which does not exist", dbName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
            }
        }
        try {
            if (force) {
                // try to drop all tables in the database
                List<String> remoteTableNames = listTableNames(dorisDb.getRemoteName());
                for (String remoteTableName : remoteTableNames) {
                    ExternalTable tbl = null;
                    try {
                        tbl = (ExternalTable) dorisDb.getTableOrDdlException(remoteTableName);
                    } catch (DdlException e) {
                        LOG.warn("failed to get table when force drop database [{}], table[{}], error: {}",
                                dbName, remoteTableName, e.getMessage());
                        continue;
                    }
                    dropTableImpl(tbl, true);
                }
                if (!remoteTableNames.isEmpty()) {
                    LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, remoteTableNames.size());
                }
            }
            client.dropDatabase(dorisDb.getRemoteName());
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void afterDropDb(String dbName) {
        catalog.unregisterDatabase(dbName);
    }

    @Override
    public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
        String dbName = stmt.getDbName();
        String tblName = stmt.getTableName();
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db == null) {
            throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName());
        }
        if (tableExist(db.getRemoteName(), tblName)) {
            if (stmt.isSetIfNotExists()) {
                LOG.info("create table[{}] which already exists", tblName);
                return true;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tblName);
            }
        }
        try {
            Map<String, String> props = stmt.getProperties();
            // set default owner
            if (!props.containsKey("owner")) {
                if (ConnectContext.get() != null) {
                    props.put("owner", ConnectContext.get().getCurrentUserIdentity().getUser());
                }
            }

            if (props.containsKey("transactional") && props.get("transactional").equalsIgnoreCase("true")) {
                throw new UserException("Not support create hive transactional table.");
                /*
                    CREATE TABLE trans6(
                      `col1` int,
                      `col2` int
                    )  ENGINE=hive
                    PROPERTIES (
                      'file_format'='orc',
                      'compression'='zlib',
                      'bucketing_version'='2',
                      'transactional'='true',
                      'transactional_properties'='default'
                    );
                    In hive, this table only can insert not update(not report error,but not actually updated).
                 */
            }

            String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format);
            Map<String, String> ddlProps = new HashMap<>();
            for (Map.Entry<String, String> entry : props.entrySet()) {
                String key = entry.getKey().toLowerCase();
                if (DORIS_HIVE_KEYS.contains(entry.getKey().toLowerCase())) {
                    ddlProps.put("doris." + key, entry.getValue());
                } else {
                    ddlProps.put(key, entry.getValue());
                }
            }
            List<String> partitionColNames = new ArrayList<>();
            if (stmt.getPartitionDesc() != null) {
                PartitionDesc partitionDesc = stmt.getPartitionDesc();
                if (partitionDesc.getType() == PartitionType.RANGE) {
                    throw new UserException("Only support 'LIST' partition type in hive catalog.");
                }
                partitionColNames.addAll(partitionDesc.getPartitionColNames());
                if (!partitionDesc.getSinglePartitionDescs().isEmpty()) {
                    throw new UserException("Partition values expressions is not supported in hive catalog.");
                }

            }
            Map<String, String> properties = catalog.getProperties();
            if (properties.containsKey(HMSProperties.HIVE_METASTORE_TYPE)
                    && properties.get(HMSProperties.HIVE_METASTORE_TYPE).equals(HMSProperties.DLF_TYPE)) {
                for (Column column : stmt.getColumns()) {
                    if (column.hasDefaultValue()) {
                        throw new UserException("Default values are not supported with `DLF` catalog.");
                    }
                }
            }
            String comment = stmt.getComment();
            Optional<String> location = Optional.ofNullable(props.getOrDefault(LOCATION_URI_KEY, null));
            HiveTableMetadata hiveTableMeta;
            DistributionDesc bucketInfo = stmt.getDistributionDesc();
            if (bucketInfo != null) {
                if (Config.enable_create_hive_bucket_table) {
                    if (bucketInfo instanceof HashDistributionDesc) {
                        hiveTableMeta = HiveTableMetadata.of(db.getRemoteName(),
                                tblName,
                                location,
                                stmt.getColumns(),
                                partitionColNames,
                                bucketInfo.getDistributionColumnNames(),
                                bucketInfo.getBuckets(),
                                ddlProps,
                                fileFormat,
                                comment);
                    } else {
                        throw new UserException("External hive table only supports hash bucketing");
                    }
                } else {
                    throw new UserException("Create hive bucket table need"
                            + " set enable_create_hive_bucket_table to true");
                }
            } else {
                hiveTableMeta = HiveTableMetadata.of(db.getRemoteName(),
                        tblName,
                        location,
                        stmt.getColumns(),
                        partitionColNames,
                        ddlProps,
                        fileFormat,
                        comment);
            }
            client.createTable(hiveTableMeta, stmt.isSetIfNotExists());
            return false;
        } catch (Exception e) {
            throw new UserException(e.getMessage(), e);
        }
    }

    @Override
    public void afterCreateTable(String dbName, String tblName) {
        Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
        if (db.isPresent()) {
            db.get().resetMetaCacheNames();
        }
        LOG.info("after create table {}.{}.{}, is db exists: {}",
                getCatalog().getName(), dbName, tblName, db.isPresent());
    }

    @Override
    public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException {
        if (!tableExist(dorisTable.getRemoteDbName(), dorisTable.getRemoteName())) {
            if (ifExists) {
                LOG.info("drop table[{}] which does not exist", dorisTable.getRemoteDbName());
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE,
                        dorisTable.getRemoteName(), dorisTable.getRemoteDbName());
            }
        }
        if (AcidUtils.isTransactionalTable(client.getTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName()))) {
            throw new DdlException("Not support drop hive transactional table.");
        }

        try {
            client.dropTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName());
        } catch (Exception e) {
            throw new DdlException(e.getMessage(), e);
        }
    }

    @Override
    public void afterDropTable(String dbName, String tblName) {
        Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
        if (db.isPresent()) {
            db.get().unregisterTable(tblName);
        }
        LOG.info("after drop table {}.{}.{}, is db exists: {}",
                getCatalog().getName(), dbName, tblName, db.isPresent());
    }

    @Override
    public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions)
            throws DdlException {
        try {
            client.truncateTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName(), partitions);
        } catch (Exception e) {
            throw new DdlException(e.getMessage(), e);
        }
    }

    @Override
    public void afterTruncateTable(String dbName, String tblName) {
        try {
            // Invalidate cache.
            Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
            if (db.isPresent()) {
                Optional tbl = db.get().getTableForReplay(tblName);
                if (tbl.isPresent()) {
                    Env.getCurrentEnv().getRefreshManager()
                            .refreshTableInternal(db.get(), (ExternalTable) tbl.get(), 0);
                }
            }
        } catch (Exception e) {
            LOG.warn("exception when calling afterTruncateTable for db: {}, table: {}, error: {}",
                    dbName, tblName, e.getMessage(), e);
        }
    }

    @Override
    public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo)
            throws UserException {
        throw new UserException("Not support create or replace branch in hive catalog.");
    }

    @Override
    public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo)
            throws UserException {
        throw new UserException("Not support create or replace tag in hive catalog.");
    }

    @Override
    public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException {
        throw new UserException("Not support drop tag in hive catalog.");
    }

    @Override
    public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException {
        throw new UserException("Not support drop branch in hive catalog.");
    }

    @Override
    public List<String> listTableNames(String dbName) {
        return client.getAllTables(dbName);
    }

    @Override
    public boolean tableExist(String dbName, String tblName) {
        return client.tableExists(dbName, tblName);
    }

    @Override
    public boolean databaseExist(String dbName) {
        return listDatabaseNames().contains(dbName.toLowerCase());
    }

    @Override
    public void close() {
        client.close();
    }

    public List<String> listDatabaseNames() {
        return client.getAllDatabases();
    }

    public void updateTableStatistics(
            NameMapping nameMapping,
            Function<HivePartitionStatistics, HivePartitionStatistics> update) {
        client.updateTableStatistics(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName(), update);
    }

    void updatePartitionStatistics(
            NameMapping nameMapping,
            String partitionName,
            Function<HivePartitionStatistics, HivePartitionStatistics> update) {
        client.updatePartitionStatistics(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName(), partitionName,
                update);
    }

    public void addPartitions(NameMapping nameMapping, List<HivePartitionWithStatistics> partitions) {
        client.addPartitions(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName(), partitions);
    }

    public void dropPartition(NameMapping nameMapping, List<String> partitionValues, boolean deleteData) {
        client.dropPartition(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName(), partitionValues,
                deleteData);
    }
}