AbstractPaimonProperties.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.property.metastore;

import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.storage.S3Properties;
import org.apache.doris.datasource.property.storage.StorageProperties;

import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public abstract class AbstractPaimonProperties extends MetastoreProperties {
    @ConnectorProperty(
            names = {"warehouse"},
            description = "The location of the Paimon warehouse. This is where the tables will be stored."
    )
    protected String warehouse;

    @Getter
    protected ExecutionAuthenticator executionAuthenticator = new ExecutionAuthenticator() {
    };

    @Getter
    protected Options catalogOptions;

    private final AtomicReference<Map<String, String>> catalogOptionsMapRef = new AtomicReference<>();

    public abstract String getPaimonCatalogType();

    private static final String USER_PROPERTY_PREFIX = "paimon.";

    protected AbstractPaimonProperties(Map<String, String> props) {
        super(props);
    }

    public abstract Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList);

    /**
     * Adapt S3 storage properties for Apache Paimon's S3 file system.
     *
     * <p>Paimon's S3 file system does not follow the standard Hadoop-compatible
     * configuration keys (like fs.s3a.access.key). Instead, it expects specific
     * keys such as "s3.access.key", "s3.secret.key", etc.
     *
     * <p>Therefore, we explicitly map our internal S3 configuration (usually designed
     * for HDFS-compatible systems) to Paimon's expected format.
     *
     * <p>See: org.apache.paimon.s3.S3Loader
     *
     * @param storagePropertiesList the list of configured storage backends
     */
    protected void appendS3PropertiesIsNeeded(List<StorageProperties> storagePropertiesList) {

        S3Properties s3Properties = (S3Properties) storagePropertiesList.stream()
                .filter(storageProperties -> storageProperties.getType() == StorageProperties.Type.S3)
                .findFirst()
                .orElse(null);
        if (s3Properties != null) {
            catalogOptions.set("s3.access.key", s3Properties.getSecretKey());
            catalogOptions.set("s3.secret.key", s3Properties.getAccessKey());
            catalogOptions.set("s3.endpoint", s3Properties.getEndpoint());
            catalogOptions.set("s3.region", s3Properties.getRegion());
        }

    }

    protected void appendCatalogOptions(List<StorageProperties> storagePropertiesList) {
        if (StringUtils.isNotBlank(warehouse)) {
            catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
        }
        catalogOptions.set(CatalogOptions.METASTORE.key(), getMetastoreType());
        origProps.forEach((k, v) -> {
            if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
                String newKey = k.substring(USER_PROPERTY_PREFIX.length());
                if (StringUtils.isNotBlank(newKey)) {
                    catalogOptions.set(newKey, v);
                }
            }
        });
        appendS3PropertiesIsNeeded(storagePropertiesList);
    }

    /**
     * Build catalog options including common and subclass-specific ones.
     */
    public void buildCatalogOptions(List<StorageProperties> storagePropertiesList) {
        catalogOptions = new Options();
        appendCatalogOptions(storagePropertiesList);
        appendCustomCatalogOptions();
    }

    public Map<String, String> getCatalogOptionsMap() {
        // Return the cached map if already initialized
        Map<String, String> existing = catalogOptionsMapRef.get();
        if (existing != null) {
            return existing;
        }

        // Check that the catalog options source is available
        if (catalogOptions == null) {
            throw new IllegalStateException("Catalog options have not been initialized. Call"
                    + " buildCatalogOptions first.");
        }

        // Construct the map manually using the provided keys
        Map<String, String> computed = new HashMap<>();
        for (String key : catalogOptions.keySet()) {
            computed.put(key, catalogOptions.get(key));
        }

        // Attempt to set the constructed map atomically; only one thread wins
        if (catalogOptionsMapRef.compareAndSet(null, computed)) {
            return computed;
        } else {
            // Another thread already initialized it; return the existing one
            return catalogOptionsMapRef.get();
        }
    }


    /**
     * Hook method for subclasses to append metastore-specific or custom catalog options.
     *
     * <p>This method is invoked after common catalog options (e.g., warehouse path,
     * metastore type, user-defined keys, and S3 compatibility mappings) have been
     * added to the {@link org.apache.paimon.options.Options} instance.
     *
     * <p>Subclasses should override this method to inject additional configuration
     * required for their specific metastore or environment. For example:
     *
     * <ul>
     *   <li>DLF-based catalog may require a custom metastore client class.</li>
     *   <li>HMS-based catalog may include URI and client pool parameters.</li>
     *   <li>Other environments may inject authentication, endpoint, or caching options.</li>
     * </ul>
     *
     * <p>If the subclass does not require any special options beyond the common ones,
     * it can safely leave this method empty.
     */
    protected abstract void appendCustomCatalogOptions();

    /**
     * Returns the metastore type identifier used by the Paimon catalog factory.
     *
     * <p>This identifier must match one of the known metastore types supported by
     * Apache Paimon. Internally, the value returned here is used to configure the
     * `metastore` option in {@code Options}, which determines the specific
     * {@link org.apache.paimon.catalog.CatalogFactory} implementation to be used
     * when instantiating the catalog.
     *
     * <p>You can find valid identifiers by reviewing implementations of the
     * {@link org.apache.paimon.catalog.CatalogFactory} interface. Each implementation
     * declares its identifier via a static {@code IDENTIFIER} field or equivalent constant.
     *
     * <p>Examples:
     * <ul>
     *   <li>{@code "filesystem"} - for {@link org.apache.paimon.catalog.FileSystemCatalogFactory}</li>
     *   <li>{@code "hive"} - for {@link org.apache.paimon.hive.HiveCatalogFactory}</li>
     * </ul>
     *
     * @return the metastore type identifier string
     */
    protected abstract String getMetastoreType();
}