CatalogConnectivityTestCoordinator.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.connectivity;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.property.metastore.HiveGlueMetaStoreProperties;
import org.apache.doris.datasource.property.metastore.HiveHMSProperties;
import org.apache.doris.datasource.property.metastore.IcebergGlueMetaStoreProperties;
import org.apache.doris.datasource.property.metastore.IcebergHMSMetaStoreProperties;
import org.apache.doris.datasource.property.metastore.IcebergRestProperties;
import org.apache.doris.datasource.property.metastore.IcebergS3TablesMetaStoreProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.HdfsProperties;
import org.apache.doris.datasource.property.storage.MinioProperties;
import org.apache.doris.datasource.property.storage.S3Properties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/**
* Coordinator for catalog connectivity testing.
* This class orchestrates the testing of metadata services and storage systems
* when creating external catalogs with test_connection=true.
*/
public class CatalogConnectivityTestCoordinator {
private static final Logger LOG = LogManager.getLogger(CatalogConnectivityTestCoordinator.class);
private final String catalogName;
private final MetastoreProperties metastoreProperties;
private final Map<StorageProperties.Type, StorageProperties> storagePropertiesMap;
private String warehouseLocation;
public CatalogConnectivityTestCoordinator(
String catalogName,
MetastoreProperties metastoreProperties,
Map<StorageProperties.Type, StorageProperties> storagePropertiesMap) {
this.catalogName = catalogName;
this.metastoreProperties = metastoreProperties;
this.storagePropertiesMap = storagePropertiesMap;
}
/**
* Run all connectivity tests for the catalog.
*
* @throws DdlException if any test fails
*/
public void runTests() throws DdlException {
// 1. Test metadata service
testMetadataService();
// 2. Test object storage for warehouse (if applicable)
StorageProperties testObjectStorageProperties = getTestObjectStorageProperties();
if (testObjectStorageProperties != null) {
testObjectStorageForWarehouse(testObjectStorageProperties);
}
// 3. Test explicitly configured HDFS (if applicable)
if (shouldTestHdfs()) {
testExplicitlyConfiguredHdfs();
}
}
/**
* Test metadata service connectivity (HMS, Glue, REST).
* Also stores the warehouse location to class variable for later use.
*
* @throws DdlException if test fails
*/
private void testMetadataService() throws DdlException {
MetaConnectivityTester metaTester = createMetaTester(metastoreProperties);
LOG.info("Testing {} connectivity for catalog '{}'", metaTester.getTestType(), catalogName);
try {
metaTester.testConnection();
} catch (Exception e) {
throw new DdlException(metaTester.getTestType() + " connectivity test failed: "
+ e.getMessage());
}
// Store warehouse location for later use
this.warehouseLocation = metaTester.getTestLocation();
if (StringUtils.isNotBlank(this.warehouseLocation)) {
LOG.debug("Got warehouse location from metadata service: {}", this.warehouseLocation);
}
}
/**
* Check if object storage test should be performed.
* Also caches the matched storage for later use in testObjectStorageForWarehouse().
*/
private StorageProperties getTestObjectStorageProperties() {
if (StringUtils.isBlank(this.warehouseLocation)) {
LOG.debug("Skip object storage test: no warehouse location from metadata service for catalog '{}'",
catalogName);
return null;
}
StorageProperties matchedObjectStorage = findMatchingObjectStorage(this.warehouseLocation);
if (matchedObjectStorage == null) {
LOG.debug("Skip object storage test: no storage configured for warehouse '{}' in catalog '{}'",
this.warehouseLocation, catalogName);
return null;
}
return matchedObjectStorage;
}
/**
* Test object storage that matches the warehouse location from metadata service.
* Uses the cached matchedObjectStorage from shouldTestObjectStorage().
*
* @throws DdlException if test fails
*/
private void testObjectStorageForWarehouse(StorageProperties testObjectStorageProperties) throws DdlException {
LOG.info("Testing {} connectivity for warehouse '{}' in catalog '{}'",
testObjectStorageProperties.getStorageName(), this.warehouseLocation, catalogName);
StorageConnectivityTester tester = createStorageTester(testObjectStorageProperties, this.warehouseLocation);
// Test FE connection
try {
tester.testFeConnection();
} catch (Exception e) {
throw new DdlException(tester.getTestType() + " connectivity test failed: " + e.getMessage());
}
// Test BE connection
try {
tester.testBeConnection();
} catch (Exception e) {
throw new DdlException(tester.getTestType()
+ " connectivity test failed (compute node): " + e.getMessage());
}
}
/**
* Find object storage that can handle the given warehouse location.
*
* @param warehouse warehouse location
* @return matching storage properties, or null if not found
*/
private StorageProperties findMatchingObjectStorage(String warehouse) {
// Check S3/Minio
if (warehouse.startsWith("s3://") || warehouse.startsWith("s3a://")) {
// Priority: Minio > S3 (if Minio is configured, use it for s3://)
StorageProperties minio = storagePropertiesMap.get(StorageProperties.Type.MINIO);
if (minio != null && isConfiguredStorage(minio)) {
return minio;
}
StorageProperties s3 = storagePropertiesMap.get(StorageProperties.Type.S3);
if (s3 != null && isConfiguredStorage(s3)) {
return s3;
}
}
return null;
}
/**
* Check if storage has credentials configured.
* Check for access key, IAM role, or other authentication methods.
*/
private boolean isConfiguredStorage(StorageProperties storage) {
// For S3: check access key or IAM role
if (storage instanceof S3Properties) {
S3Properties s3 = (S3Properties) storage;
return StringUtils.isNotBlank(s3.getAccessKey())
|| StringUtils.isNotBlank(s3.getS3IAMRole());
}
// For Minio: check access key
if (storage instanceof MinioProperties) {
MinioProperties minio = (MinioProperties) storage;
return StringUtils.isNotBlank(minio.getAccessKey());
}
// For other storage types, assume configured if exists
return true;
}
/**
* Check if HDFS test should be performed.
*/
private boolean shouldTestHdfs() {
StorageProperties hdfsStorage = storagePropertiesMap.get(StorageProperties.Type.HDFS);
if (!(hdfsStorage instanceof HdfsProperties)) {
return false;
}
HdfsProperties hdfs = (HdfsProperties) hdfsStorage;
if (!hdfs.isExplicitlyConfigured()) {
LOG.debug("Skip HDFS test: not explicitly configured by user for catalog '{}'", catalogName);
return false;
}
if (StringUtils.isBlank(hdfs.getDefaultFS())) {
LOG.debug("Skip HDFS test: fs.defaultFS not configured for catalog '{}'", catalogName);
return false;
}
return true;
}
/**
* Test explicitly configured HDFS.
*
* @throws DdlException if test fails
*/
private void testExplicitlyConfiguredHdfs() throws DdlException {
HdfsProperties hdfs = (HdfsProperties) storagePropertiesMap.get(StorageProperties.Type.HDFS);
String defaultFS = hdfs.getDefaultFS();
LOG.info("Testing HDFS connectivity for '{}' in catalog '{}'", defaultFS, catalogName);
StorageConnectivityTester tester = createStorageTester(hdfs, defaultFS);
// Test FE connection
try {
tester.testFeConnection();
} catch (Exception e) {
throw new DdlException("HDFS connectivity test failed: " + e.getMessage());
}
// Test BE connection
try {
tester.testBeConnection();
} catch (Exception e) {
throw new DdlException("HDFS connectivity test failed (compute node): " + e.getMessage());
}
}
/**
* Create metadata connectivity tester based on properties type.
*/
private MetaConnectivityTester createMetaTester(MetastoreProperties props) {
// Hive HMS
if (props instanceof HiveHMSProperties) {
HiveHMSProperties hiveProps = (HiveHMSProperties) props;
return new HiveHMSConnectivityTester(hiveProps, hiveProps.getHmsBaseProperties());
}
// Hive Glue
if (props instanceof HiveGlueMetaStoreProperties) {
HiveGlueMetaStoreProperties glueProps = (HiveGlueMetaStoreProperties) props;
return new HiveGlueMetaStoreConnectivityTester(glueProps, glueProps.getBaseProperties());
}
// Iceberg HMS
if (props instanceof IcebergHMSMetaStoreProperties) {
IcebergHMSMetaStoreProperties icebergHms = (IcebergHMSMetaStoreProperties) props;
return new IcebergHMSConnectivityTester(icebergHms, icebergHms.getHmsBaseProperties());
}
// Iceberg Glue
if (props instanceof IcebergGlueMetaStoreProperties) {
IcebergGlueMetaStoreProperties icebergGlue = (IcebergGlueMetaStoreProperties) props;
return new IcebergGlueMetaStoreConnectivityTester(icebergGlue, icebergGlue.getGlueProperties());
}
// Iceberg REST
if (props instanceof IcebergRestProperties) {
return new IcebergRestConnectivityTester((IcebergRestProperties) props);
}
// Iceberg S3Table
if (props instanceof IcebergS3TablesMetaStoreProperties) {
return new IcebergS3TablesMetaStoreConnectivityTester((IcebergS3TablesMetaStoreProperties) props);
}
// Default: no-op tester
return new MetaConnectivityTester() {
};
}
/**
* Create storage connectivity tester based on properties type and location.
*/
private StorageConnectivityTester createStorageTester(StorageProperties props, String location) {
// S3
if (props instanceof S3Properties) {
return new S3ConnectivityTester((S3Properties) props, location);
}
// Minio
if (props instanceof MinioProperties) {
return new MinioConnectivityTester((MinioProperties) props, location);
}
// HDFS
if (props instanceof HdfsProperties) {
return new HdfsConnectivityTester((HdfsProperties) props);
}
// Default: no-op tester
return new StorageConnectivityTester() {
};
}
}