HiveCompatibleCatalog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import shade.doris.hive.org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
protected Configuration conf;
protected ClientPool<IMetaStoreClient, TException> clients;
protected FileIO fileIO;
protected String catalogName;
public void initialize(String name, FileIO fileIO,
ClientPool<IMetaStoreClient, TException> clients) {
this.catalogName = name;
this.fileIO = fileIO;
this.clients = clients;
}
protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) {
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
if (fileIOImpl == null) {
/* when use the S3FileIO, we need some custom configurations,
* so HadoopFileIO is used in the superclass by default
* we can add better implementations to derived class just like the implementation in DLFCatalog.
*/
FileIO io = new HadoopFileIO(hadoopConf);
io.initialize(properties);
return io;
} else {
return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
}
}
@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
return null;
}
@Override
protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
return tableIdentifier.namespace().levels().length == 1;
}
protected boolean isValidNamespace(Namespace namespace) {
return namespace.levels().length != 1;
}
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
if (isValidNamespace(namespace)) {
throw new NoSuchTableException("Invalid namespace: %s", namespace);
}
String dbName = namespace.level(0);
try {
return clients.run(client -> client.getAllTables(dbName))
.stream()
.map(tbl -> TableIdentifier.of(dbName, tbl))
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) {
throw new UnsupportedOperationException(
"Cannot drop table " + tableIdentifier + " : dropTable is not supported");
}
@Override
public void renameTable(TableIdentifier sourceTbl, TableIdentifier targetTbl) {
throw new UnsupportedOperationException(
"Cannot rename table " + sourceTbl + " : renameTable is not supported");
}
@Override
public void createNamespace(Namespace namespace, Map<String, String> props) {
throw new UnsupportedOperationException(
"Cannot create namespace " + namespace + " : createNamespace is not supported");
}
@Override
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
if (isValidNamespace(namespace) && !namespace.isEmpty()) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
if (!namespace.isEmpty()) {
return new ArrayList<>();
}
List<Namespace> namespaces = new ArrayList<>();
List<String> databases;
try {
databases = clients.run(client -> client.getAllDatabases());
for (String database : databases) {
namespaces.add(Namespace.of(database));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return namespaces;
}
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
if (isValidNamespace(namespace)) {
throw new NoSuchTableException("Invalid namespace: %s", namespace);
}
String dbName = namespace.level(0);
try {
return clients.run(client -> client.getDatabase(dbName)).getParameters();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
throw new UnsupportedOperationException(
"Cannot drop namespace " + namespace + " : dropNamespace is not supported");
}
@Override
public boolean setProperties(Namespace namespace, Map<String, String> props) throws NoSuchNamespaceException {
throw new UnsupportedOperationException(
"Cannot set namespace properties " + namespace + " : setProperties is not supported");
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> pNames) throws NoSuchNamespaceException {
throw new UnsupportedOperationException(
"Cannot remove properties " + namespace + " : removeProperties is not supported");
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
}