LocationPath.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.util;

import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fsv2.FileSystemType;
import org.apache.doris.thrift.TFileType;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;

public class LocationPath {
    private static final Logger LOG = LogManager.getLogger(LocationPath.class);
    private static final String SCHEME_DELIM = "://";
    private static final String NONSTANDARD_SCHEME_DELIM = ":/";
    private static final String STANDARD_HDFS_PREFIX = "hdfs://";
    private static final String EMPTY_HDFS_PREFIX = "hdfs:///";
    private static final String BROKEN_HDFS_PREFIX = "hdfs:/";
    private final Scheme scheme;
    private final String location;
    private final boolean isBindBroker;

    public enum Scheme {
        HDFS,
        LOCAL, // Local File
        BOS, // Baidu
        GCS, // Google,
        OBS, // Huawei,
        COS, // Tencent
        COSN, // Tencent
        OFS, // Tencent CHDFS
        GFS, // Tencent GooseFs,
        LAKEFS, // used by Tencent DLC
        OSS, // Alibaba,
        OSS_HDFS, // JindoFS on OSS
        JFS, // JuiceFS,
        S3,
        S3A,
        S3N,
        VIEWFS,
        UNKNOWN,
        NOSCHEME // no scheme info
    }

    @VisibleForTesting
    public LocationPath(String location) {
        this(location, Maps.newHashMap(), true);
    }

    public LocationPath(String location, Map<String, String> props) {
        this(location, props, true);
    }

    public LocationPath(String originLocation, Map<String, String> props, boolean convertPath) {
        isBindBroker = props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME);
        String tmpLocation = originLocation;
        if (!(originLocation.contains(SCHEME_DELIM) || originLocation.contains(NONSTANDARD_SCHEME_DELIM))) {
            // Sometimes the file path does not contain scheme, need to add default fs
            // eg, /path/to/file.parquet -> hdfs://nn/path/to/file.parquet
            // the default fs is from the catalog properties
            String defaultFS = props.getOrDefault(HdfsResource.HADOOP_FS_NAME, "");
            tmpLocation = defaultFS + originLocation;
        }
        String scheme = parseScheme(tmpLocation).toLowerCase();
        switch (scheme) {
            case "":
                this.scheme = Scheme.NOSCHEME;
                break;
            case FeConstants.FS_PREFIX_HDFS:
                this.scheme = Scheme.HDFS;
                // Need add hdfs host to location
                String host = props.get(HdfsResource.DSF_NAMESERVICES);
                boolean enableOssRootPolicy = props.getOrDefault(ExternalCatalog.OOS_ROOT_POLICY, "false")
                        .equals("true");
                tmpLocation = convertPath ? normalizedHdfsPath(tmpLocation, host, enableOssRootPolicy) : tmpLocation;
                break;
            case FeConstants.FS_PREFIX_S3:
                this.scheme = Scheme.S3;
                break;
            case FeConstants.FS_PREFIX_S3A:
                this.scheme = Scheme.S3A;
                tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                break;
            case FeConstants.FS_PREFIX_S3N:
                // include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
                this.scheme = Scheme.S3N;
                tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                break;
            case FeConstants.FS_PREFIX_BOS:
                this.scheme = Scheme.BOS;
                // use s3 client to access
                tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                break;
            case FeConstants.FS_PREFIX_GCS:
                this.scheme = Scheme.GCS;
                // use s3 client to access
                tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                break;
            case FeConstants.FS_PREFIX_OSS:
                String endpoint = "";
                if (props.containsKey(OssProperties.ENDPOINT)) {
                    endpoint = props.get(OssProperties.ENDPOINT);
                    if (endpoint.startsWith(OssProperties.OSS_PREFIX)) {
                        // may use oss.oss-cn-beijing.aliyuncs.com
                        endpoint = endpoint.replace(OssProperties.OSS_PREFIX, "");
                    }
                } else if (props.containsKey(S3Properties.ENDPOINT)) {
                    endpoint = props.get(S3Properties.ENDPOINT);
                } else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
                    endpoint = props.get(S3Properties.Env.ENDPOINT);
                }
                if (isHdfsOnOssEndpoint(endpoint)) {
                    this.scheme = Scheme.OSS_HDFS;
                } else {
                    if (useS3EndPoint(props)) {
                        tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                    }
                    this.scheme = Scheme.OSS;
                }
                break;
            case FeConstants.FS_PREFIX_COS:
                if (useS3EndPoint(props)) {
                    tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                }
                this.scheme = Scheme.COS;
                break;
            case FeConstants.FS_PREFIX_OBS:
                if (useS3EndPoint(props)) {
                    tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
                }
                this.scheme = Scheme.OBS;
                break;
            case FeConstants.FS_PREFIX_OFS:
                this.scheme = Scheme.OFS;
                break;
            case FeConstants.FS_PREFIX_JFS:
                this.scheme = Scheme.JFS;
                break;
            case FeConstants.FS_PREFIX_GFS:
                this.scheme = Scheme.GFS;
                break;
            case FeConstants.FS_PREFIX_COSN:
                // if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
                this.scheme = Scheme.COSN;
                break;
            case FeConstants.FS_PREFIX_LAKEFS:
                this.scheme = Scheme.COSN;
                tmpLocation = normalizedLakefsPath(tmpLocation);
                break;
            case FeConstants.FS_PREFIX_VIEWFS:
                this.scheme = Scheme.VIEWFS;
                break;
            case FeConstants.FS_PREFIX_FILE:
                this.scheme = Scheme.LOCAL;
                break;
            default:
                this.scheme = Scheme.UNKNOWN;
                break;
        }
        this.location = tmpLocation;
    }

    // Return true if this location is with oss-hdfs
    public static boolean isHdfsOnOssEndpoint(String location) {
        // example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs".
        // https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
        return location.contains("oss-dls.aliyuncs");
    }

    // Return the file system type and the file system identity.
    // The file system identity is the scheme and authority of the URI, eg. "hdfs://host:port" or "s3://bucket".
    public static Pair<FileSystemType, String> getFSIdentity(String location,
            Map<String, String> properties, String bindBrokerName) {
        LocationPath locationPath = new LocationPath(location, properties, true);
        FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType();
        URI uri = locationPath.getPath().toUri();
        String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
        return Pair.of(fsType, fsIdent);
    }

    /**
     * provide file type for BE.
     *
     * @param location the location is from fs.listFile
     * @return on BE, we will use TFileType to get the suitable client to access storage.
     */
    public static TFileType getTFileTypeForBE(String location) {
        if (location == null || location.isEmpty()) {
            return null;
        }
        LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false);
        return locationPath.getTFileTypeForBE();
    }

    public static String getTempWritePath(String loc, String prefix) {
        Path tempRoot = new Path(loc, prefix);
        Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", ""));
        return tempPath.toString();
    }

    public TFileType getTFileTypeForBE() {
        switch (scheme) {
            case S3:
            case S3A:
            case S3N:
            case COS:
            case OSS:
            case OBS:
            case BOS:
            case GCS:
                // ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access.
            case COSN:
            case LAKEFS:
                // now we only support S3 client for object storage on BE
                return TFileType.FILE_S3;
            case HDFS:
            case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
            case VIEWFS:
                return TFileType.FILE_HDFS;
            case GFS:
            case JFS:
            case OFS:
                return TFileType.FILE_BROKER;
            case LOCAL:
                return TFileType.FILE_LOCAL;
            default:
                return null;
        }
    }

    /**
     * The converted path is used for BE
     *
     * @return BE scan range path
     */
    public Path toStorageLocation() {
        switch (scheme) {
            case S3:
            case S3A:
            case S3N:
            case COS:
            case OSS:
            case OBS:
            case BOS:
            case GCS:
            case COSN:
                // All storage will use s3 client to access on BE, so need convert to s3
                return new Path(convertToS3(location));
            case HDFS:
            case OSS_HDFS:
            case VIEWFS:
            case GFS:
            case JFS:
            case OFS:
            case LOCAL:
            default:
                return getPath();
        }
    }

    public Scheme getScheme() {
        return scheme;
    }

    public String get() {
        return location;
    }

    public Path getPath() {
        return new Path(location);
    }

    public boolean isBindBroker() {
        return isBindBroker;
    }

    private static String parseScheme(String finalLocation) {
        String scheme = "";
        String[] schemeSplit = finalLocation.split(SCHEME_DELIM);
        if (schemeSplit.length > 1) {
            scheme = schemeSplit[0];
        } else {
            schemeSplit = finalLocation.split(NONSTANDARD_SCHEME_DELIM);
            if (schemeSplit.length > 1) {
                scheme = schemeSplit[0];
            }
        }

        // if not get scheme, need consider /path/to/local to no scheme
        if (scheme.isEmpty()) {
            try {
                Paths.get(finalLocation);
            } catch (InvalidPathException exception) {
                throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + finalLocation);
            }
        }

        return scheme;
    }

    private boolean useS3EndPoint(Map<String, String> props) {
        if (props.containsKey(ObsProperties.ENDPOINT)
                || props.containsKey(OssProperties.ENDPOINT)
                || props.containsKey(CosProperties.ENDPOINT)) {
            return false;
        }
        // wide check range for the compatibility of s3 properties
        return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT));
    }

    /**
     * The converted path is used for FE to get metadata.
     * Change http://xxxx to s3://xxxx
     *
     * @param location origin location
     * @return metadata location path. just convert when storage is compatible with s3 client.
     */
    private static String convertToS3(String location) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("try convert location to s3 prefix: " + location);
        }
        int pos = findDomainPos(location);
        return "s3" + location.substring(pos);
    }

    private static int findDomainPos(String rangeLocation) {
        int pos = rangeLocation.indexOf("://");
        if (pos == -1) {
            throw new RuntimeException("No '://' found in location: " + rangeLocation);
        }
        return pos;
    }

    @VisibleForTesting
    public static String normalizedHdfsPath(String location, String host, boolean enableOssRootPolicy) {
        try {
            // Hive partition may contain special characters such as ' ', '<', '>' and so on.
            // Need to encode these characters before creating URI.
            // But doesn't encode '/' and ':' so that we can get the correct uri host.
            String newLocation = URLEncoder.encode(location, StandardCharsets.UTF_8.name()).replace("%2F", "/")
                    .replace("%3A", ":");
            URI normalizedUri = new URI(newLocation).normalize();
            // compatible with 'hdfs:///' or 'hdfs:/'
            if (StringUtils.isEmpty(normalizedUri.getHost())) {
                newLocation = URLDecoder.decode(newLocation, StandardCharsets.UTF_8.name());
                if (newLocation.startsWith(BROKEN_HDFS_PREFIX) && !newLocation.startsWith(STANDARD_HDFS_PREFIX)) {
                    newLocation = newLocation.replace(BROKEN_HDFS_PREFIX, STANDARD_HDFS_PREFIX);
                }
                if (StringUtils.isNotEmpty(host)) {
                    // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
                    // Or hdfs:///abc to hdfs://name_service/abc
                    if (newLocation.startsWith(EMPTY_HDFS_PREFIX)) {
                        return newLocation.replace(STANDARD_HDFS_PREFIX, STANDARD_HDFS_PREFIX + host);
                    } else {
                        return newLocation.replace(STANDARD_HDFS_PREFIX, STANDARD_HDFS_PREFIX + host + "/");
                    }
                } else {
                    // 'hdfs://null/' equals the 'hdfs:///'
                    if (newLocation.startsWith(EMPTY_HDFS_PREFIX)) {
                        // Do not support hdfs:///location
                        throw new RuntimeException("Invalid location with empty host: " + newLocation);
                    } else {
                        if (enableOssRootPolicy) {
                            // if oss root policy is enabled, the path should be like:
                            // hdfs://customized_host/path/to/file
                            // Should remain unchanged.
                            return newLocation;
                        } else {
                            // Replace 'hdfs://key/' to '/key/', try access local NameNode on BE.
                            return newLocation.replace(STANDARD_HDFS_PREFIX, "/");
                        }
                    }
                }
            }
            return URLDecoder.decode(newLocation, StandardCharsets.UTF_8.name());
        } catch (URISyntaxException | UnsupportedEncodingException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    private static String normalizedLakefsPath(String location) {
        int atIndex = location.indexOf("@dlc");
        if (atIndex != -1) {
            return "lakefs://" + location.substring(atIndex + 1);
        } else {
            return location;
        }
    }

    public FileSystemType getFileSystemType() {
        FileSystemType fsType;
        switch (scheme) {
            case S3:
            case S3A:
            case S3N:
            case COS:
            case OSS:
            case OBS:
            case BOS:
            case GCS:
                // All storage will use s3 client to access on BE, so need convert to s3
                fsType = FileSystemType.S3;
                break;
            case COSN:
                // COSN use s3 client on FE side, because it need to complete multi-part uploading files on FE side.
                fsType = FileSystemType.S3;
                break;
            case OFS:
                // ofs:// use the underlying file system: Tencent Cloud HDFS, aka CHDFS)) {
                fsType = FileSystemType.OFS;
                break;
            case HDFS:
            case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
            case VIEWFS:
            case GFS:
                fsType = FileSystemType.HDFS;
                break;
            case JFS:
                fsType = FileSystemType.JFS;
                break;
            case LOCAL:
                fsType = FileSystemType.FILE;
                break;
            default:
                throw new UnsupportedOperationException("Unknown file system for location: " + location);
        }
        return fsType;
    }

    @Override
    public String toString() {
        return get();
    }
}