HiveMetadataOps.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.qe.ConnectContext;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

public class HiveMetadataOps implements ExternalMetadataOps {
    public static final String LOCATION_URI_KEY = "location";
    public static final String FILE_FORMAT_KEY = "file_format";
    public static final Set<String> DORIS_HIVE_KEYS = ImmutableSet.of(FILE_FORMAT_KEY, LOCATION_URI_KEY);
    private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class);
    private static final int MIN_CLIENT_POOL_SIZE = 8;
    private final HMSCachedClient client;
    private final HMSExternalCatalog catalog;
    private HadoopAuthenticator hadoopAuthenticator;

    public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) {
        this(catalog, createCachedClient(hiveConf,
                Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size),
                jdbcClientConfig));
        hadoopAuthenticator = catalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
        client.setHadoopAuthenticator(hadoopAuthenticator);
    }

    @VisibleForTesting
    public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) {
        this.catalog = catalog;
        this.client = client;
    }

    public HMSCachedClient getClient() {
        return client;
    }

    public HMSExternalCatalog getCatalog() {
        return catalog;
    }

    private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize,
            JdbcClientConfig jdbcClientConfig) {
        if (hiveConf != null) {
            ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
            return client;
        }
        Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null");
        String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl());
        switch (dbType) {
            case JdbcResource.POSTGRESQL:
                return new PostgreSQLJdbcHMSCachedClient(jdbcClientConfig);
            default:
                throw new IllegalArgumentException("Unsupported DB type: " + dbType);
        }
    }

    @Override
    public void createDbImpl(CreateDbStmt stmt) throws DdlException {
        String fullDbName = stmt.getFullDbName();
        Map<String, String> properties = stmt.getProperties();
        long dbId = Env.getCurrentEnv().getNextId();
        if (databaseExist(fullDbName)) {
            if (stmt.isSetIfNotExists()) {
                LOG.info("create database[{}] which already exists", fullDbName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName);
            }
        }
        try {
            HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata();
            catalogDatabase.setDbName(fullDbName);
            if (properties.containsKey(LOCATION_URI_KEY)) {
                catalogDatabase.setLocationUri(properties.get(LOCATION_URI_KEY));
            }
            // remove it when set
            properties.remove(LOCATION_URI_KEY);
            catalogDatabase.setProperties(properties);
            catalogDatabase.setComment(properties.getOrDefault("comment", ""));
            client.createDatabase(catalogDatabase);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        LOG.info("createDb dbName = " + fullDbName + ", id = " + dbId);
    }

    @Override
    public void createDbImpl(CreateDatabaseCommand command) throws DdlException {
        String fullDbName = command.getDbName();
        Map<String, String> properties = command.getProperties();
        long dbId = Env.getCurrentEnv().getNextId();
        if (databaseExist(fullDbName)) {
            if (command.isIfNotExists()) {
                LOG.info("create database[{}] which already exists", fullDbName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, fullDbName);
            }
        }
        try {
            HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata();
            catalogDatabase.setDbName(fullDbName);
            if (properties.containsKey(LOCATION_URI_KEY)) {
                catalogDatabase.setLocationUri(properties.get(LOCATION_URI_KEY));
            }
            // remove it when set
            properties.remove(LOCATION_URI_KEY);
            catalogDatabase.setProperties(properties);
            catalogDatabase.setComment(properties.getOrDefault("comment", ""));
            client.createDatabase(catalogDatabase);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        LOG.info("createDb dbName = " + fullDbName + ", id = " + dbId);
    }

    @Override
    public void afterCreateDb(String dbName) {
        catalog.onRefreshCache(true);
    }

    @Override
    public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException {
        if (!databaseExist(dbName)) {
            if (ifExists) {
                LOG.info("drop database[{}] which does not exist", dbName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName);
            }
        }
        try {
            if (force) {
                // try to drop all tables in the database
                List<String> tables = listTableNames(dbName);
                for (String table : tables) {
                    dropTableImpl(dbName, table, true);
                }
                if (!tables.isEmpty()) {
                    LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, tables.size());
                }
            }
            client.dropDatabase(dbName);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void afterDropDb(String dbName) {
        catalog.onRefreshCache(true);
    }

    @Override
    public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
        String dbName = stmt.getDbName();
        String tblName = stmt.getTableName();
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db == null) {
            throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName());
        }
        if (tableExist(dbName, tblName)) {
            if (stmt.isSetIfNotExists()) {
                LOG.info("create table[{}] which already exists", tblName);
                return true;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tblName);
            }
        }
        try {
            Map<String, String> props = stmt.getProperties();
            // set default owner
            if (!props.containsKey("owner")) {
                if (ConnectContext.get() != null) {
                    props.put("owner", ConnectContext.get().getCurrentUserIdentity().getUser());
                }
            }

            if (props.containsKey("transactional") && props.get("transactional").equalsIgnoreCase("true")) {
                throw new UserException("Not support create hive transactional table.");
                /*
                    CREATE TABLE trans6(
                      `col1` int,
                      `col2` int
                    )  ENGINE=hive
                    PROPERTIES (
                      'file_format'='orc',
                      'compression'='zlib',
                      'bucketing_version'='2',
                      'transactional'='true',
                      'transactional_properties'='default'
                    );
                    In hive, this table only can insert not update(not report error,but not actually updated).
                 */
            }

            String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format);
            Map<String, String> ddlProps = new HashMap<>();
            for (Map.Entry<String, String> entry : props.entrySet()) {
                String key = entry.getKey().toLowerCase();
                if (DORIS_HIVE_KEYS.contains(entry.getKey().toLowerCase())) {
                    ddlProps.put("doris." + key, entry.getValue());
                } else {
                    ddlProps.put(key, entry.getValue());
                }
            }
            List<String> partitionColNames = new ArrayList<>();
            if (stmt.getPartitionDesc() != null) {
                PartitionDesc partitionDesc = stmt.getPartitionDesc();
                if (partitionDesc.getType() == PartitionType.RANGE) {
                    throw new UserException("Only support 'LIST' partition type in hive catalog.");
                }
                partitionColNames.addAll(partitionDesc.getPartitionColNames());
                if (!partitionDesc.getSinglePartitionDescs().isEmpty()) {
                    throw new UserException("Partition values expressions is not supported in hive catalog.");
                }

            }
            Map<String, String> properties = catalog.getProperties();
            if (properties.containsKey(HMSProperties.HIVE_METASTORE_TYPE)
                    && properties.get(HMSProperties.HIVE_METASTORE_TYPE).equals(HMSProperties.DLF_TYPE)) {
                for (Column column : stmt.getColumns()) {
                    if (column.hasDefaultValue()) {
                        throw new UserException("Default values are not supported with `DLF` catalog.");
                    }
                }
            }
            String comment = stmt.getComment();
            Optional<String> location = Optional.ofNullable(props.getOrDefault(LOCATION_URI_KEY, null));
            HiveTableMetadata hiveTableMeta;
            DistributionDesc bucketInfo = stmt.getDistributionDesc();
            if (bucketInfo != null) {
                if (Config.enable_create_hive_bucket_table) {
                    if (bucketInfo instanceof HashDistributionDesc) {
                        hiveTableMeta = HiveTableMetadata.of(dbName,
                                tblName,
                                location,
                                stmt.getColumns(),
                                partitionColNames,
                                ((HashDistributionDesc) bucketInfo).getDistributionColumnNames(),
                                bucketInfo.getBuckets(),
                                ddlProps,
                                fileFormat,
                                comment);
                    } else {
                        throw new UserException("External hive table only supports hash bucketing");
                    }
                } else {
                    throw new UserException("Create hive bucket table need"
                            + " set enable_create_hive_bucket_table to true");
                }
            } else {
                hiveTableMeta = HiveTableMetadata.of(dbName,
                        tblName,
                        location,
                        stmt.getColumns(),
                        partitionColNames,
                        ddlProps,
                        fileFormat,
                        comment);
            }
            client.createTable(hiveTableMeta, stmt.isSetIfNotExists());
        } catch (Exception e) {
            throw new UserException(e.getMessage(), e);
        }
        return false;
    }

    @Override
    public void afterCreateTable(String dbName, String tblName) {
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db != null) {
            db.setUnInitialized(true);
        }
    }

    @Override
    public void dropTableImpl(DropTableStmt stmt) throws DdlException {
        if (stmt == null) {
            throw new DdlException("DropTableStmt is null");
        }
        dropTableImpl(stmt.getDbName(), stmt.getTableName(), stmt.isSetIfExists());
    }

    @Override
    public void dropTableImpl(String dbName, String tblName, boolean ifExists) throws DdlException {
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db == null) {
            if (ifExists) {
                LOG.info("database [{}] does not exist when drop table[{}]", dbName, tblName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
            }
        }
        if (!tableExist(dbName, tblName)) {
            if (ifExists) {
                LOG.info("drop table[{}] which does not exist", dbName);
                return;
            } else {
                ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tblName, dbName);
            }
        }
        if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) {
            throw new DdlException("Not support drop hive transactional table.");
        }

        try {
            client.dropTable(dbName, tblName);
        } catch (Exception e) {
            throw new DdlException(e.getMessage(), e);
        }
    }

    @Override
    public void afterDropTable(String dbName, String tblName) {
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db != null) {
            db.setUnInitialized(true);
        }
    }

    @Override
    public void truncateTableImpl(String dbName, String tblName, List<String> partitions)
            throws DdlException {
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db == null) {
            throw new DdlException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName());
        }
        try {
            client.truncateTable(dbName, tblName, partitions);
        } catch (Exception e) {
            throw new DdlException(e.getMessage(), e);
        }
    }

    @Override
    public void afterTruncateTable(String dbName, String tblName) {
        // Invalidate cache.
        Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(catalog.getId(), dbName, tblName);
        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
        if (db != null) {
            db.setLastUpdateTime(System.currentTimeMillis());
            db.setUnInitialized(true);
        }
    }

    @Override
    public List<String> listTableNames(String dbName) {
        return client.getAllTables(dbName);
    }

    @Override
    public boolean tableExist(String dbName, String tblName) {
        return client.tableExists(dbName, tblName);
    }

    @Override
    public boolean databaseExist(String dbName) {
        return listDatabaseNames().contains(dbName.toLowerCase());
    }

    @Override
    public void close() {
        client.close();
    }

    public List<String> listDatabaseNames() {
        return client.getAllDatabases();
    }

    public void updateTableStatistics(
            SimpleTableInfo tableInfo,
            Function<HivePartitionStatistics, HivePartitionStatistics> update) {
        client.updateTableStatistics(tableInfo.getDbName(), tableInfo.getTbName(), update);
    }

    void updatePartitionStatistics(
            SimpleTableInfo tableInfo,
            String partitionName,
            Function<HivePartitionStatistics, HivePartitionStatistics> update) {
        client.updatePartitionStatistics(tableInfo.getDbName(), tableInfo.getTbName(), partitionName, update);
    }

    public void addPartitions(SimpleTableInfo tableInfo, List<HivePartitionWithStatistics> partitions) {
        client.addPartitions(tableInfo.getDbName(), tableInfo.getTbName(), partitions);
    }

    public void dropPartition(SimpleTableInfo tableInfo, List<String> partitionValues, boolean deleteData) {
        client.dropPartition(tableInfo.getDbName(), tableInfo.getTbName(), partitionValues, deleteData);
    }
}