CopyProperties.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PrintableMap;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;

public class CopyProperties {
    protected Map<String, String> properties;
    protected String prefix;

    private static final String FILE_PREFIX = "file.";
    // properties for type, compression, column_separator
    public static final String TYPE = FILE_PREFIX + "type";
    public static final String COMPRESSION = FILE_PREFIX + "compression";
    public static final String COLUMN_SEPARATOR = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR;
    // properties for data desc
    public static final String LINE_DELIMITER = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_LINE_DELIMITER;
    public static final String PARAM_STRIP_OUTER_ARRAY = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY;
    public static final String PARAM_FUZZY_PARSE = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_FUZZY_PARSE;
    public static final String PARAM_NUM_AS_STRING = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_NUM_AS_STRING;
    public static final String PARAM_JSONPATHS = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_JSONPATHS;
    public static final String PARAM_JSONROOT = FILE_PREFIX + LoadStmt.KEY_IN_PARAM_JSONROOT;

    public static final String COPY_PREFIX = "copy.";
    // property for size limit, async, on_error
    public static final String SIZE_LIMIT = COPY_PREFIX + "size_limit";
    public static final String ASYNC = COPY_PREFIX + "async";
    public static final String ON_ERROR = COPY_PREFIX + "on_error";
    public static final String ON_ERROR_CONTINUE = "continue";
    public static final String ON_ERROR_ABORT_STATEMENT = "abort_statement";
    public static final String ON_ERROR_MAX_FILTER_RATIO = LoadStmt.MAX_FILTER_RATIO_PROPERTY + "_";
    public static final String STRICT_MODE = COPY_PREFIX + LoadStmt.STRICT_MODE;
    public static final String LOAD_PARALLELISM = COPY_PREFIX + LoadStmt.LOAD_PARALLELISM;
    // If 'copy.force' is true, load files to table without checking if files have been loaded, and copy job will not
    // be recorded in meta service. So it may cause one file is copied to a table many times.
    public static final String FORCE = COPY_PREFIX + "force";
    public static final String USE_DELETE_SIGN = COPY_PREFIX + "use_delete_sign";

    public CopyProperties(Map<String, String> properties, String prefix) {
        Map<String, String> newProperties = new HashMap<>();
        for (String key : properties.keySet()) {
            newProperties.put(key, properties.get(key));
        }
        this.properties = newProperties;
        this.prefix = prefix;
    }

    protected void analyzeTypeAndCompression() throws AnalysisException {
        // analyze type and compression: See {@link BrokerScanNode#formatType}, we only support COMPRESSION on CSV
        String compression = properties.get(addKeyPrefix(COMPRESSION));
        String type = properties.get(addKeyPrefix(TYPE));
        if (!StringUtils.isEmpty(compression) && !isTypeEmpty(type) && !(type.equalsIgnoreCase("csv")
                || type.equalsIgnoreCase("json"))) {
            throw new AnalysisException("Compression only support CSV or JSON file type, but input type is " + type);
        }
    }

    protected void analyzeSizeLimit() throws AnalysisException {
        String key = addKeyPrefix(SIZE_LIMIT);
        if (properties.containsKey(key)) {
            String value = properties.get(key);
            try {
                Long.parseLong(value);
            } catch (Exception e) {
                throw new AnalysisException("Property " + key + " with invalid value " + value);
            }
        }
    }

    protected void analyzeLoadParallelism() throws AnalysisException {
        String key = addKeyPrefix(LOAD_PARALLELISM);
        if (properties.containsKey(key)) {
            String value = properties.get(key);
            try {
                Integer.parseInt(value);
            } catch (Exception e) {
                throw new AnalysisException("Property " + key + " with invalid value " + value);
            }
        }
    }

    protected void analyzeOnError() throws AnalysisException {
        String key = addKeyPrefix(ON_ERROR);
        if (properties.containsKey(key)) {
            String value = properties.get(key);
            try {
                if (value.startsWith(ON_ERROR_MAX_FILTER_RATIO)) {
                    double maxFilterRatio = getMaxFilterRatio();
                    if (maxFilterRatio < 0 || maxFilterRatio > 1) {
                        throw new AnalysisException("max_filter_ratio must in [0, 1]");
                    }
                } else if (!value.equalsIgnoreCase(ON_ERROR_CONTINUE) && !value.equalsIgnoreCase(
                        ON_ERROR_ABORT_STATEMENT)) {
                    throw new AnalysisException("Property " + key + " with invalid value " + value);
                }
            } catch (Exception e) {
                throw new AnalysisException("Property " + key + " with invalid value " + value);
            }
        }
    }

    protected void analyzeAsync() throws AnalysisException {
        analyzeBooleanProperty(ASYNC);
    }

    protected void analyzeStrictMode() throws AnalysisException {
        analyzeBooleanProperty(STRICT_MODE);
    }

    protected void analyzeForce() throws AnalysisException {
        analyzeBooleanProperty(FORCE);
    }

    protected void analyzeUseDeleteSign() throws AnalysisException {
        analyzeBooleanProperty(USE_DELETE_SIGN);
    }

    protected void analyzeBooleanProperty(String keyWithoutPrefix) throws AnalysisException {
        String key = addKeyPrefix(keyWithoutPrefix);
        if (properties.containsKey(key)) {
            String value = properties.get(key);
            if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
                throw new AnalysisException("Property " + key + " with invalid value " + value);
            }
        }
    }

    /**
     * @return the size limit, note that 0 means no limit
     */
    public long getSizeLimit() {
        String key = addKeyPrefix(SIZE_LIMIT);
        if (properties.containsKey(key)) {
            return Long.parseLong(properties.get(key));
        }
        return 0;
    }

    public double getMaxFilterRatio() {
        String key = addKeyPrefix(ON_ERROR);
        if (properties.containsKey(key)) {
            String value = properties.get(key);
            if (value.startsWith(ON_ERROR_MAX_FILTER_RATIO)) {
                return Double.parseDouble(value.substring(ON_ERROR_MAX_FILTER_RATIO.length()));
            } else if (value.equalsIgnoreCase(ON_ERROR_CONTINUE)) {
                return 1;
            } else {
                return 0;
            }
        }
        return 0;
    }

    public String getFileType() {
        // Use ExternalFileScanNode instead of BrokerScanNode, see {@link LoadScanProvider#formatType}
        // if file format type is set on stage, and we want to override by copy into, can set null
        String type = properties.get(addKeyPrefix(TYPE));
        return isTypeEmpty(type) ? null : type;
    }

    public String getFileTypeIgnoreCompression() {
        return properties.get(addKeyPrefix(TYPE));
    }

    public String getCompression() {
        return properties.get(addKeyPrefix(COMPRESSION));
    }

    public String getColumnSeparator() {
        return properties.get(addKeyPrefix(COLUMN_SEPARATOR));
    }

    public boolean isAsync() {
        String key = addKeyPrefix(ASYNC);
        return properties.containsKey(key) ? Boolean.parseBoolean(properties.get(key)) : true;
    }

    public boolean isForce() {
        String key = addKeyPrefix(FORCE);
        return properties.containsKey(key) ? Boolean.parseBoolean(properties.get(key)) : false;
    }

    public boolean useDeleteSign() {
        String key = addKeyPrefix(USE_DELETE_SIGN);
        return properties.containsKey(key) ? Boolean.parseBoolean(properties.get(key)) : false;
    }

    public Map<String, String> getProperties() {
        return properties;
    }

    public String toSql() {
        StringBuilder sb = new StringBuilder();
        sb.append("(").append(new PrintableMap<>(properties, "=", true, false)).append(") ");
        return sb.toString();
    }

    private boolean isTypeEmpty(String type) {
        return StringUtils.isEmpty(type) || type.equalsIgnoreCase("null");
    }

    protected String addKeyPrefix(String key) {
        return prefix + key;
    }

    protected String removeFilePrefix(String key) {
        if (key.startsWith(FILE_PREFIX)) {
            return key.substring(FILE_PREFIX.length());
        } else if (key.startsWith(COPY_PREFIX)) {
            return key.substring(COPY_PREFIX.length());
        }
        return key;
    }
}