AbstractIcebergProperties.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.metastore;

import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.metacache.CacheSpec;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
import org.apache.doris.datasource.property.storage.S3Properties;
import org.apache.doris.datasource.property.storage.StorageProperties;

import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @See org.apache.iceberg.CatalogProperties
 */
public abstract class AbstractIcebergProperties extends MetastoreProperties {

    @Getter
    @ConnectorProperty(
            names = {CatalogProperties.WAREHOUSE_LOCATION},
            required = false,
            description = "The location of the Iceberg warehouse. This is where the tables will be stored."
    )
    protected String warehouse;

    @Getter
    @ConnectorProperty(
            names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED},
            required = false,
            description = "Controls whether to use caching during manifest reads or not. Default: false."
    )
    protected String ioManifestCacheEnabled;

    @Getter
    @ConnectorProperty(
            names = {CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS},
            required = false,
            description = "Controls the maximum duration for which an entry stays in the manifest cache. "
                    + "Must be a non-negative value. Zero means entries expire only due to memory pressure. "
                    + "Default: 60000 (60s)."
    )
    protected String ioManifestCacheExpirationIntervalMs;

    @Getter
    @ConnectorProperty(
            names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES},
            required = false,
            description = "Controls the maximum total amount of bytes to cache in manifest cache. "
                    + "Must be a positive value. Default: 104857600 (100MB)."
    )
    protected String ioManifestCacheMaxTotalBytes;

    @Getter
    @ConnectorProperty(
            names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH},
            required = false,
            description = "Controls the maximum length of file to be considered for caching. "
                    + "An InputFile will not be cached if the length is longer than this limit. "
                    + "Must be a positive value. Default: 8388608 (8MB)."
    )
    protected String ioManifestCacheMaxContentLength;

    @Getter
    @ConnectorProperty(
            names = {CatalogProperties.FILE_IO_IMPL},
            required = false,
            description = "Custom io impl for iceberg"
    )
    protected String ioImpl;

    @Getter
    protected ExecutionAuthenticator executionAuthenticator = new ExecutionAuthenticator(){};

    public abstract String getIcebergCatalogType();

    protected AbstractIcebergProperties(Map<String, String> props) {
        super(Type.ICEBERG, props);
    }

    /**
     * Iceberg Catalog instance responsible for managing metadata and lifecycle of Iceberg tables.
     * <p>
     * The Catalog is a core component in Iceberg that handles table registration,
     * loading, and metadata management.
     * <p>
     * It is assigned during initialization via the `initialize` method,
     * which calls the abstract `initCatalog` method to create a concrete Catalog instance.
     * This instance is typically configured based on the provided catalog name
     * and a list of storage properties.
     * <p>
     * After initialization, the catalog must not be null; otherwise,
     * an IllegalStateException is thrown to ensure that subsequent operations
     * on Iceberg tables have a valid Catalog reference.
     * <p>
     * Different Iceberg Catalog implementations (such as HadoopCatalog, HiveCatalog,
     * RESTCatalog, etc.) can be flexibly switched and configured
     * by subclasses overriding the `initCatalog` method.
     * <p>
     * This field is used to perform metadata operations like creating, querying,
     * and deleting Iceberg tables.
     */
    public final Catalog initializeCatalog(String catalogName,
                                           List<StorageProperties> storagePropertiesList) {
        Map<String, String> catalogProps = new HashMap<>(getOrigProps());
        if (StringUtils.isNotBlank(warehouse)) {
            catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
        }

        // Add manifest cache properties if configured
        addManifestCacheProperties(catalogProps);

        Catalog catalog = initCatalog(catalogName, catalogProps, storagePropertiesList);

        if (catalog == null) {
            throw new IllegalStateException("Catalog must not be null after initialization.");
        }
        return catalog;
    }

    /**
     * Add manifest cache related properties to catalog properties.
     * These properties control caching behavior during manifest reads.
     *
     * @param catalogProps the catalog properties map to add manifest cache properties to
     */
    protected void addManifestCacheProperties(Map<String, String> catalogProps) {
        boolean hasIoManifestCacheEnabled = StringUtils.isNotBlank(ioManifestCacheEnabled)
                || StringUtils.isNotBlank(catalogProps.get(CatalogProperties.IO_MANIFEST_CACHE_ENABLED));
        if (StringUtils.isNotBlank(ioManifestCacheEnabled)) {
            catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, ioManifestCacheEnabled);
        }
        if (StringUtils.isNotBlank(ioManifestCacheExpirationIntervalMs)) {
            catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
                    ioManifestCacheExpirationIntervalMs);
        }
        if (StringUtils.isNotBlank(ioManifestCacheMaxTotalBytes)) {
            catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES, ioManifestCacheMaxTotalBytes);
        }
        if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) {
            catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, ioManifestCacheMaxContentLength);
        }

        // default enable io manifest cache if the meta.cache.manifest is enabled
        if (!hasIoManifestCacheEnabled) {
            CacheSpec manifestCacheSpec = CacheSpec.fromProperties(catalogProps,
                    IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE,
                    IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE,
                    IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND,
                    IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND,
                    IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY,
                    IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY);
            if (CacheSpec.isCacheEnabled(manifestCacheSpec.isEnable(),
                    manifestCacheSpec.getTtlSecond(),
                    manifestCacheSpec.getCapacity())) {
                catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED, "true");
            }
        }
    }

    /**
     * Subclasses must implement this to create the concrete Catalog instance.
     */
    protected abstract Catalog initCatalog(
            String catalogName,
            Map<String, String> catalogProps,
            List<StorageProperties> storagePropertiesList
    );

    /**
     * Unified method to configure FileIO properties for Iceberg catalog.
     * This method handles all storage types (HDFS, S3, MinIO, etc.) by:
     * 1. Adding all storage properties to Hadoop Configuration (for HadoopFileIO / S3A access).
     * 2. Extracting S3-compatible properties into fileIOProperties map (for Iceberg S3FileIO).
     *
     * @param storagePropertiesList list of storage properties
     * @param fileIOProperties options map to be populated with S3 FileIO properties
     * @return Hadoop Configuration populated with all storage properties
     */
    public void toFileIOProperties(List<StorageProperties> storagePropertiesList,
            Map<String, String> fileIOProperties, Configuration conf) {
        // We only support one S3-compatible storage property for FileIO configuration.
        // When multiple AbstractS3CompatibleProperties exist, prefer the first non-S3Properties one,
        // because a non-S3 type (e.g. OSSProperties, COSProperties) indicates the user has explicitly
        // specified a concrete S3-compatible storage, which should take priority over the generic S3Properties.
        AbstractS3CompatibleProperties s3Fallback = null;
        AbstractS3CompatibleProperties s3Target = null;
        for (StorageProperties storageProperties : storagePropertiesList) {
            if (conf != null && storageProperties.getHadoopStorageConfig() != null) {
                conf.addResource(storageProperties.getHadoopStorageConfig());
            }
            if (storageProperties instanceof AbstractS3CompatibleProperties) {
                if (s3Fallback == null) {
                    s3Fallback = (AbstractS3CompatibleProperties) storageProperties;
                }
                if (s3Target == null && !(storageProperties instanceof S3Properties)) {
                    s3Target = (AbstractS3CompatibleProperties) storageProperties;
                }
            }
        }
        AbstractS3CompatibleProperties chosen = s3Target != null ? s3Target : s3Fallback;
        if (chosen != null) {
            toS3FileIOProperties(chosen, fileIOProperties);
        }
    }

    /**
     * Configure S3 FileIO properties for all S3-compatible storage types (S3, MinIO, etc.)
     * This method provides a unified way to convert S3-compatible properties to Iceberg S3FileIO format.
     *
     * @param s3Properties S3-compatible properties
     * @param options Options map to be populated with S3 FileIO properties
     */
    private void toS3FileIOProperties(AbstractS3CompatibleProperties s3Properties, Map<String, String> options) {
        // Common properties - only set if not blank
        if (StringUtils.isNotBlank(s3Properties.getEndpoint())) {
            options.put(S3FileIOProperties.ENDPOINT, s3Properties.getEndpoint());
        }
        if (StringUtils.isNotBlank(s3Properties.getUsePathStyle())) {
            options.put(S3FileIOProperties.PATH_STYLE_ACCESS, s3Properties.getUsePathStyle());
        }
        if (StringUtils.isNotBlank(s3Properties.getRegion())) {
            options.put(AwsClientProperties.CLIENT_REGION, s3Properties.getRegion());
        }
        if (StringUtils.isNotBlank(s3Properties.getAccessKey())) {
            options.put(S3FileIOProperties.ACCESS_KEY_ID, s3Properties.getAccessKey());
        }
        if (StringUtils.isNotBlank(s3Properties.getSecretKey())) {
            options.put(S3FileIOProperties.SECRET_ACCESS_KEY, s3Properties.getSecretKey());
        }
        if (StringUtils.isNotBlank(s3Properties.getSessionToken())) {
            options.put(S3FileIOProperties.SESSION_TOKEN, s3Properties.getSessionToken());
        }
    }

    protected Catalog buildIcebergCatalog(String catalogName, Map<String, String> options, Configuration conf) {
        // For Iceberg SDK, "type" means catalog type, such as hive, jdbc, rest.
        // But in Doris, "type" is "iceberg".
        // And Iceberg SDK does not allow with both "type" and "catalog-impl" properties,
        // So here we remove "type" and make sure "catalog-impl" is set.
        options.remove(CatalogUtil.ICEBERG_CATALOG_TYPE);
        Preconditions.checkArgument(options.containsKey(CatalogProperties.CATALOG_IMPL));
        return CatalogUtil.buildIcebergCatalog(catalogName, options, conf);
    }
}