CatalogSsrfChecker.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.connectivity;

import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.HdfsProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.foundation.property.ConnectorProperty;

import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Validates user-supplied catalog URIs against SSRF attacks before a catalog is created.
 *
 * <p>Unlike {@link CatalogConnectivityTestCoordinator}, this checker opens no network
 * connections; it only invokes {@link SecurityChecker#startSSRFChecking(String)} on each URI
 * so the platform's SSRF rule engine can reject internal / private / loopback hosts.
 * Because no connectivity probe is required, the check runs unconditionally on every CREATE
 * CATALOG, regardless of the {@code test_connection} property.
 *
 * <p>Discovery is driven by the {@link ConnectorProperty#checkSsrf()} flag ��� to extend
 * coverage to a new property class, simply set {@code checkSsrf = true} on its endpoint /
 * URI field; no change to this class is required.
 */
public class CatalogSsrfChecker {
    private static final Logger LOG = LogManager.getLogger(CatalogSsrfChecker.class);

    /** Reflection traversal only descends into property classes under this package prefix. */
    private static final String PROPERTY_PACKAGE_PREFIX = "org.apache.doris.datasource.property.";

    /**
     * HDFS HA exposes one rpc-address per namenode under dynamic keys
     * (e.g. {@code dfs.namenode.rpc-address.<nameservice>.<nn>}); these are stored in
     * {@link HdfsProperties#getBackendConfigProperties()} rather than as declared fields,
     * so {@link ConnectorProperty#checkSsrf()} cannot reach them. They are collected
     * separately.
     */
    private static final String HDFS_NAMENODE_RPC_ADDRESS_PREFIX = "dfs.namenode.rpc-address.";

    private CatalogSsrfChecker() {}

    /**
     * Validate every user-supplied URI on the given catalog properties.
     *
     * @throws DdlException if any URI fails the SSRF check
     */
    public static void check(String catalogName,
                             MetastoreProperties metastoreProperties,
                             Map<StorageProperties.Type, StorageProperties> storagePropertiesMap)
            throws DdlException {
        List<String> uris = new ArrayList<>();
        Set<Object> visited = Sets.newIdentityHashSet();

        if (metastoreProperties != null) {
            collectAnnotatedUris(metastoreProperties, visited, uris);
        }
        if (storagePropertiesMap != null) {
            for (StorageProperties sp : storagePropertiesMap.values()) {
                collectStorageUris(sp, visited, uris);
            }
        }
        for (String uri : uris) {
            checkSingleUri(catalogName, uri);
        }
    }

    /**
     * Collect every URI worth SSRF-checking on a single storage property object.
     *
     * <p>Auto-fallback HDFS storage ({@code explicitlyConfigured=false}) is dropped wholesale
     * ��� both its annotated {@code fs.defaultFS} field and any dynamic namenode rpc-address
     * entries ��� because that {@link HdfsProperties} instance was synthesised by the
     * framework for catalogs whose user never configured HDFS, and so its values shouldn't
     * surface as user-supplied URIs.
     */
    private static void collectStorageUris(StorageProperties sp, Set<Object> visited, List<String> uris) {
        if (sp instanceof HdfsProperties && !((HdfsProperties) sp).isExplicitlyConfigured()) {
            return;
        }
        collectAnnotatedUris(sp, visited, uris);
        collectDynamicUris(sp, uris);
    }

    /**
     * Walk the object graph rooted at {@code root}, collecting String values of any field
     * annotated with {@link ConnectorProperty#checkSsrf()}{@code  = true}. Recurses through
     * fields whose declared type lives under {@link #PROPERTY_PACKAGE_PREFIX}; an identity
     * set prevents revisits.
     */
    private static void collectAnnotatedUris(Object root, Set<Object> visited, List<String> uris) {
        if (root == null || !visited.add(root)) {
            return;
        }
        Class<?> clazz = root.getClass();
        while (clazz != null && clazz != Object.class) {
            for (Field field : clazz.getDeclaredFields()) {
                if (Modifier.isStatic(field.getModifiers())) {
                    continue;
                }
                ConnectorProperty annotation = field.getAnnotation(ConnectorProperty.class);
                Object value = readField(field, root);
                if (value == null) {
                    continue;
                }
                if (annotation != null && annotation.checkSsrf() && value instanceof String) {
                    String s = (String) value;
                    if (StringUtils.isNotBlank(s)) {
                        uris.add(s);
                    }
                }
                if (isPropertyContainer(value)) {
                    collectAnnotatedUris(value, visited, uris);
                }
            }
            clazz = clazz.getSuperclass();
        }
    }

    /**
     * Collect URIs that live behind dynamic property keys (those not bound to a static
     * field via {@code @ConnectorProperty}). Currently this means HDFS namenode rpc-address
     * entries on {@link HdfsProperties}. The {@code isExplicitlyConfigured} gating is
     * applied by {@link #collectStorageUris} before reaching here.
     */
    private static void collectDynamicUris(StorageProperties props, List<String> uris) {
        if (!(props instanceof HdfsProperties)) {
            return;
        }
        Map<String, String> backendConfig = ((HdfsProperties) props).getBackendConfigProperties();
        if (backendConfig == null) {
            return;
        }
        for (Map.Entry<String, String> e : backendConfig.entrySet()) {
            if (e.getKey() != null && e.getKey().startsWith(HDFS_NAMENODE_RPC_ADDRESS_PREFIX)
                    && StringUtils.isNotBlank(e.getValue())) {
                uris.add(e.getValue());
            }
        }
    }

    private static Object readField(Field field, Object obj) {
        try {
            field.setAccessible(true);
            return field.get(obj);
        } catch (IllegalAccessException e) {
            LOG.warn("Failed to read field {} on {}", field.getName(), obj.getClass().getName(), e);
            return null;
        }
    }

    /**
     * True if {@code value} is itself a Doris property POJO that we should recurse into
     * (e.g. {@code HMSBaseProperties} embedded inside {@code HiveHMSProperties}).
     * Returns false for Strings, primitives, collections, framework types, etc.
     */
    private static boolean isPropertyContainer(Object value) {
        if (value == null) {
            return false;
        }
        if (value instanceof CharSequence || value instanceof Number || value instanceof Boolean
                || value instanceof Collection || value instanceof Map) {
            return false;
        }
        Class<?> c = value.getClass();
        if (c.isPrimitive() || c.isArray() || c.isEnum()) {
            return false;
        }
        String name = c.getName();
        return name.startsWith(PROPERTY_PACKAGE_PREFIX);
    }

    /**
     * Run a single URI through the SSRF check. URI may be a full URL ({@code thrift://h:p},
     * {@code hdfs://h:p}, {@code https://...}) or a bare {@code host[:port]}. We normalize to
     * {@code http://...} solely so the underlying SSRF rule engine can parse it; no HTTP
     * connection is opened here.
     *
     * <p>HMS allows comma-separated URIs (e.g. {@code thrift://h1:p,thrift://h2:p}); each is
     * validated independently so a single bad host fails the catalog creation.
     */
    private static void checkSingleUri(String catalogName, String uri) throws DdlException {
        if (StringUtils.isBlank(uri)) {
            return;
        }
        for (String token : uri.split(",")) {
            String single = token.trim();
            if (single.isEmpty()) {
                continue;
            }
            String urlStr = normalizeToHttpUrl(single);
            if (urlStr == null) {
                continue;
            }
            try {
                SecurityChecker.getInstance().startSSRFChecking(urlStr);
            } catch (Exception e) {
                LOG.warn("SSRF check failed for catalog '{}', uri '{}'", catalogName, single, e);
                throw new DdlException("SSRF check failed for catalog '" + catalogName
                        + "', uri '" + single + "': " + e.getMessage());
            } finally {
                SecurityChecker.getInstance().stopSSRFChecking();
            }
        }
    }

    private static String normalizeToHttpUrl(String uri) {
        String s = uri;
        int schemeIdx = s.indexOf("://");
        if (schemeIdx != -1) {
            s = s.substring(schemeIdx + 3);
        }
        int slash = s.indexOf('/');
        if (slash != -1) {
            s = s.substring(0, slash);
        }
        int qmark = s.indexOf('?');
        if (qmark != -1) {
            s = s.substring(0, qmark);
        }
        if (StringUtils.isBlank(s)) {
            return null;
        }
        return "http://" + s;
    }
}