LocationPath.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.util;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import org.apache.doris.fs.FileSystemType;
import org.apache.doris.fs.SchemaTypeMapper;
import org.apache.doris.thrift.TFileType;

import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.util.Map;
import java.util.UUID;

/**
 * LocationPath is a utility class for parsing, validating, and normalizing storage location URIs.
 * It supports various storage backends such as HDFS, S3, OSS, and local file systems.
 * <p>
 * Core responsibilities include:
 * - Extracting the schema (e.g., "s3", "hdfs", "file") from a location string.
 * - Normalizing the location path using the corresponding {@link StorageProperties} for the schema.
 * - Deriving the file system identifier (e.g., "s3://bucket") to uniquely identify a storage endpoint.
 * - Mapping the schema to corresponding {@link TFileType} and {@link FileSystemType} for backend access.
 * <p>
 * Special handling:
 * - Supports both standard ("scheme://") and nonstandard ("scheme:/") URI formats.
 * - If the schema is missing (e.g., for local paths), it gracefully falls back to treating the path as local/HDFS.
 * - Includes fallback compatibility logic for legacy schema mappings (e.g., S3 vs COS vs MinIO).
 * <p>
 * This class is often used by Frontend to pass normalized locations and storage metadata to Backend (BE).
 */
public class LocationPath {
    private static final Logger LOG = LogManager.getLogger(LocationPath.class);
    private static final String SCHEME_DELIM = "://";
    private static final String NONSTANDARD_SCHEME_DELIM = ":/";

    /**
     * URI schema, e.g., "s3", "hdfs", "file"
     */
    private final String schema;

    /**
     * Normalized and validated location URI
     */
    private final String normalizedLocation;

    /**
     * Unique filesystem identifier, typically "scheme://authority"
     */
    private final String fsIdentifier;

    /**
     * Storage properties associated with this schema
     */
    private final StorageProperties storageProperties;

    /**
     * Private constructor to enforce creation through the factory method.
     */
    private LocationPath(String schema,
                         String normalizedLocation,
                         String fsIdentifier,
                         StorageProperties storageProperties) {
        this.schema = schema;
        this.normalizedLocation = normalizedLocation;
        this.fsIdentifier = fsIdentifier;
        this.storageProperties = storageProperties;
    }

    private static String parseScheme(String finalLocation) {
        String scheme = "";
        String[] schemeSplit = finalLocation.split(SCHEME_DELIM);
        if (schemeSplit.length > 1) {
            scheme = schemeSplit[0];
        } else {
            schemeSplit = finalLocation.split(NONSTANDARD_SCHEME_DELIM);
            if (schemeSplit.length > 1) {
                scheme = schemeSplit[0];
            }
        }

        // if not get scheme, need consider /path/to/local to no scheme
        if (scheme.isEmpty()) {
            try {
                Paths.get(finalLocation);
            } catch (InvalidPathException exception) {
                throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + finalLocation);
            }
        }

        return scheme;
    }

    /**
     * Static factory method to create a LocationPath instance.
     *
     * @param location             the input URI location string
     * @param storagePropertiesMap map of schema type to corresponding storage properties
     * @return a new LocationPath instance
     * @throws UserException if validation fails or required data is missing
     */
    public static LocationPath of(String location,
                                  Map<StorageProperties.Type, StorageProperties> storagePropertiesMap,
                                  boolean normalize) throws UserException {
        String schema = extractScheme(location);
        String normalizedLocation = location;
        StorageProperties storageProperties = null;
        StorageProperties.Type type = fromSchemaWithContext(location, schema);
        if (StorageProperties.Type.LOCAL.equals(type)) {
            normalize = false;
        }
        if (normalize) {
            storageProperties = findStorageProperties(type, schema, storagePropertiesMap);

            if (storageProperties == null) {
                throw new UserException("No storage properties found for schema: " + schema);
            }
            normalizedLocation = storageProperties.validateAndNormalizeUri(location);
            if (StringUtils.isBlank(normalizedLocation)) {
                throw new IllegalArgumentException("Invalid location: " + location + ", normalized location is null");
            }
        }
        String encodedLocation = encodedLocation(normalizedLocation);
        URI uri = URI.create(encodedLocation);
        String fsIdentifier = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());

        return new LocationPath(schema, normalizedLocation, fsIdentifier, storageProperties);
    }

    public static StorageProperties.Type fromSchemaWithContext(String location, String schema) {
        if (isHdfsOnOssEndpoint(location)) {
            return StorageProperties.Type.OSS_HDFS;
        }
        return SchemaTypeMapper.fromSchema(schema); // fallback to default
    }

    public static LocationPath of(String location) {
        String schema = extractScheme(location);
        String encodedLocation = encodedLocation(location);
        URI uri = URI.create(encodedLocation);
        String fsIdentifier = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
        return new LocationPath(schema, location, fsIdentifier, null);
    }

    /**
     * Static factory method to create a LocationPath instance.
     *
     * @param location             the input URI location string
     * @param storagePropertiesMap map of schema type to corresponding storage properties
     * @return a new LocationPath instance
     */
    public static LocationPath of(String location,
                                  Map<StorageProperties.Type, StorageProperties> storagePropertiesMap) {
        try {
            return LocationPath.of(location, storagePropertiesMap, true);
        } catch (UserException e) {
            throw new StoragePropertiesException("Failed to create LocationPath for location: " + location, e);
        }
    }

    /**
     * Extracts the URI scheme (e.g., "s3", "hdfs") from the location string.
     *
     * @param location the input URI string
     * @return the extracted scheme
     * @throws IllegalArgumentException if the scheme is missing or URI is malformed
     */
    private static String extractScheme(String location) {
        if (Strings.isNullOrEmpty(location)) {
            return null;
        }
        return parseScheme(location);
    }

    /**
     * Finds the appropriate {@link StorageProperties} configuration for a given storage type and schema.
     * <p>
     * This method attempts to locate the storage properties using the following logic:
     * <p>
     * 1. Direct match by type: Attempts to retrieve the properties from the map using the given {@code type}.
     * 2. S3-Minio fallback: If the requested type is S3 and no properties are found, try to fall back to MinIO
     * configuration,
     * assuming it is compatible with S3.
     * 3. Compatibility fallback based on schema:
     * In older configurations, the schema name might not strictly match the actual storage type.
     * For example, a COS storage might use the "s3" schema, or an S3 storage might use the "cos" schema.
     * To handle such legacy inconsistencies, we try to find any storage configuration with the name "s3"
     * if the schema maps to a file type of FILE_S3.
     *
     * @param type                 the storage type to search for
     * @param schema               the schema string used in the original request (e.g., "s3://bucket/file")
     * @param storagePropertiesMap a map of available storage types to their configuration
     * @return a matching {@link StorageProperties} if found; otherwise, {@code null}
     */
    private static StorageProperties findStorageProperties(StorageProperties.Type type, String schema,
                                                           Map<StorageProperties.Type, StorageProperties>
                                                                   storagePropertiesMap) {
        // Step 1: Try direct match by type
        StorageProperties props = storagePropertiesMap.get(type);
        if (props != null) {
            return props;
        }

        // Step 2: Fallback - if type is S3 and MinIO is configured, assume it's compatible
        if (type == StorageProperties.Type.S3
                && storagePropertiesMap.containsKey(StorageProperties.Type.MINIO)) {
            return storagePropertiesMap.get(StorageProperties.Type.MINIO);
        }

        // Step 3: Compatibility fallback based on schema
        // In previous configurations, the schema name may not strictly match the actual storage type.
        // For example, a COS storage might use the "s3" schema, or an S3 storage might use the "cos" schema.
        // To handle such legacy inconsistencies, we try to find a storage configuration whose name is "s3".
        if (TFileType.FILE_S3.equals(SchemaTypeMapper.fromSchemaToFileType(schema))) {
            return storagePropertiesMap.values().stream()
                    .filter(p -> "s3".equalsIgnoreCase(p.getStorageName()))
                    .findFirst()
                    .orElse(null);
        }

        // Not found
        return null;
    }


    private static String encodedLocation(String location) {
        try {
            return URLEncoder.encode(location, StandardCharsets.UTF_8.name())
                    .replace("%2F", "/")
                    .replace("%3A", ":");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Failed to encode location: " + location, e);
        }
    }


    // Return true if this location is with oss-hdfs
    public static boolean isHdfsOnOssEndpoint(String location) {
        // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs".
        // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
        return location.contains("oss-dls.aliyuncs");
    }

    /**
     * provide file type for BE.
     *
     * @param location the location is from fs.listFile
     * @return on BE, we will use TFileType to get the suitable client to access storage.
     */
    public static TFileType getTFileTypeForBE(String location) {
        if (StringUtils.isBlank(location)) {
            return null;
        }
        if (isHdfsOnOssEndpoint(location)) {
            return TFileType.FILE_HDFS;
        }
        LocationPath locationPath = LocationPath.of(location);
        return locationPath.getTFileTypeForBE();
    }

    public static String getTempWritePath(String loc, String prefix) {
        Path tempRoot = new Path(loc, prefix);
        Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", ""));
        return tempPath.toString();
    }

    public TFileType getTFileTypeForBE() {
        return SchemaTypeMapper.fromSchemaToFileType(schema);
    }

    /**
     * The converted path is used for BE
     *
     * @return BE scan range path
     */
    public Path toStorageLocation() {
        return new Path(normalizedLocation);
    }


    public FileSystemType getFileSystemType() {
        return SchemaTypeMapper.fromSchemaToFileSystemType(schema);
    }


    // Getters (optional, if needed externally)
    public String getSchema() {
        return schema;
    }

    public String getNormalizedLocation() {
        return normalizedLocation;
    }

    public String getFsIdentifier() {
        return fsIdentifier;
    }

    public StorageProperties getStorageProperties() {
        return storageProperties;
    }

    public Path getPath() {
        return new Path(normalizedLocation);
    }
}