DppConfig.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load;

import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.thrift.TPriority;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

public class DppConfig implements Writable {
    // necessary hadoop job config keys
    private static final String FS_DEFAULT_NAME = "fs.default.name";
    private static final String MAPRED_JOB_TRACKER = "mapred.job.tracker";
    private static final String HADOOP_JOB_UGI = "hadoop.job.ugi";

    // bos hadoop keys
    private static final String FS_BOS_ENDPOINT = "fs.bos.endpoint";
    private static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key";
    private static final String FS_BOS_SECRET_ACCESS_KEY = "fs.bos.secret.access.key";

    private static final String SEMICOLON_SEPARATOR = ";";
    private static final String EQUAL_SEPARATOR = "=";
    private static final String COLON_SEPARATOR = ":";

    private static final String APPLICATIONS_PATH = "applications";
    private static final String OUTPUT_PATH = "output";

    public static final String PALO_PATH = "hadoop_palo_path";
    public static final String HTTP_PORT = "hadoop_http_port";
    public static final String HADOOP_CONFIGS = "hadoop_configs";
    public static final String PRIORITY = "priority";

    public static final String CLUSTER_NAME_REGEX = "[a-z][a-z0-9-_]{0,63}";

    private static final int DEFAULT_HTTP_PORT = 8070;

    // palo base path in hadoop
    //   dpp: paloPath/cluster_id/applications/dpp_version
    //   output: paloPath/cluster_id/output
    @SerializedName(value = "paloPath")
    private String paloPath;
    @SerializedName(value = "httpPort")
    private int httpPort;
    @SerializedName(value = "hadoopConfigs")
    private Map<String, String> hadoopConfigs;

    // priority for palo internal schedule
    // for now are etl submit schedule and download file schedule
    @SerializedName(value = "priority")
    private TPriority priority;

    // for persist
    public DppConfig() {
        this(null, -1, null, null);
    }

    private DppConfig(String paloPath, int httpPort, Map<String, String> hadoopConfigs, TPriority priority) {
        this.paloPath = paloPath;
        this.httpPort = httpPort;
        this.hadoopConfigs = hadoopConfigs;
        this.priority = priority;
    }

    public static DppConfig create(Map<String, String> configMap) throws LoadException {
        String paloPath = null;
        int httpPort = -1;
        Map<String, String> hadoopConfigs = Maps.newHashMap();
        TPriority priority = null;

        for (Map.Entry<String, String> entry : configMap.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();

            if (key.equalsIgnoreCase(PALO_PATH)) {
                // palo path
                if (Strings.isNullOrEmpty(value)) {
                    throw new LoadException("Load cluster " + PALO_PATH + " is null");
                }
                paloPath = value.trim();
            } else if (key.equalsIgnoreCase(HTTP_PORT)) {
                // http port
                try {
                    httpPort = Integer.parseInt(value);
                } catch (NumberFormatException e) {
                    throw new LoadException("Load cluster " + HTTP_PORT + " is not INT");
                }
            } else if (key.equalsIgnoreCase(HADOOP_CONFIGS)) {
                // hadoop configs
                if (Strings.isNullOrEmpty(value)) {
                    throw new LoadException("Load cluster " + HADOOP_CONFIGS + " is null");
                }

                value = value.trim();
                for (String config : value.split(SEMICOLON_SEPARATOR)) {
                    config = config.trim();
                    if (config.equals("")) {
                        continue;
                    }

                    String[] keyValueArr = config.split(EQUAL_SEPARATOR);
                    if (keyValueArr.length != 2) {
                        throw new LoadException("Load cluster " + HADOOP_CONFIGS + " format error");
                    }

                    hadoopConfigs.put(keyValueArr[0], keyValueArr[1]);
                }
            } else if (key.equalsIgnoreCase(PRIORITY)) {
                try {
                    priority = TPriority.valueOf(value);
                } catch (Exception e) {
                    throw new LoadException("Load cluster " + PRIORITY + " format error");
                }
            }  else {
                throw new LoadException("Unknown load cluster config key: " + key);
            }
        }

        if (hadoopConfigs.isEmpty()) {
            hadoopConfigs = null;
        }

        return new DppConfig(paloPath, httpPort, hadoopConfigs, priority);
    }

    public void update(DppConfig dppConfig) {
        update(dppConfig, false);
    }

    public void update(DppConfig dppConfig, boolean needReplace) {
        if (dppConfig == null) {
            return;
        }

        if (dppConfig.paloPath != null) {
            paloPath = dppConfig.paloPath;
        }

        if (dppConfig.httpPort != -1) {
            httpPort = dppConfig.httpPort;
        }

        if (dppConfig.hadoopConfigs != null) {
            if (needReplace) {
                if (!dppConfig.hadoopConfigs.isEmpty()) {
                    hadoopConfigs = dppConfig.hadoopConfigs;
                }
            } else {
                if (hadoopConfigs == null) {
                    hadoopConfigs = Maps.newHashMap();
                }
                hadoopConfigs.putAll(dppConfig.hadoopConfigs);
            }
        }

        if (dppConfig.priority != null) {
            priority = dppConfig.priority;
        }
    }

    public void updateHadoopConfigs(Map<String, String> configMap) throws LoadException {
        if (configMap == null) {
            return;
        }

        if (hadoopConfigs == null) {
            hadoopConfigs = Maps.newHashMap();
        }

        // bos configs
        int bosParameters = 0;
        if (configMap.containsKey(LoadStmt.BOS_ENDPOINT)) {
            String bosEndpoint = configMap.get(LoadStmt.BOS_ENDPOINT);
            hadoopConfigs.put(FS_BOS_ENDPOINT, bosEndpoint);
        }

        if (configMap.containsKey(LoadStmt.BOS_ACCESSKEY)) {
            bosParameters++;
            String bosAccessKey = configMap.get(LoadStmt.BOS_ACCESSKEY);
            hadoopConfigs.put(FS_BOS_ACCESS_KEY, bosAccessKey);
        }

        if (configMap.containsKey(LoadStmt.BOS_SECRET_ACCESSKEY)) {
            bosParameters++;
            String bosSecretAccessKey = configMap.get(LoadStmt.BOS_SECRET_ACCESSKEY);
            hadoopConfigs.put(FS_BOS_SECRET_ACCESS_KEY, bosSecretAccessKey);
        }

        if (bosParameters > 0 && bosParameters < 2) {
            throw new LoadException("You should specify 3 parameters (" + LoadStmt.BOS_ENDPOINT + ", "
                    + LoadStmt.BOS_ACCESSKEY + ", " + LoadStmt.BOS_SECRET_ACCESSKEY
                    + ") when loading data from BOS");
        }

        if (hadoopConfigs.isEmpty()) {
            hadoopConfigs = null;
        }
    }

    public void resetConfigByKey(String key) throws LoadException {
        if (key.equalsIgnoreCase(PALO_PATH)) {
            paloPath = null;
        } else if (key.equalsIgnoreCase(HTTP_PORT)) {
            httpPort = DEFAULT_HTTP_PORT;
        } else if (key.equalsIgnoreCase(HADOOP_CONFIGS)) {
            hadoopConfigs = null;
        } else if (key.equalsIgnoreCase(PRIORITY)) {
            priority = TPriority.NORMAL;
        } else {
            throw new LoadException("Unknown load cluster config key: " + key);
        }
    }

    public void clear() {
        // retain 3 necessary configs for clear dpp output
        Iterator<Map.Entry<String, String>> iter = hadoopConfigs.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, String> entry = iter.next();
            String key = entry.getKey();
            if (!key.equals(FS_DEFAULT_NAME) && !key.equals(MAPRED_JOB_TRACKER) && !key.equals(HADOOP_JOB_UGI)) {
                iter.remove();
            }
        }
    }

    public void check() throws LoadException {
        if (Strings.isNullOrEmpty(paloPath)) {
            throw new LoadException("Load cluster " + PALO_PATH + " is null");
        }

        if (httpPort == -1) {
            // use default
            httpPort = DEFAULT_HTTP_PORT;
        }

        // check necessary hadoop configs
        if (hadoopConfigs == null) {
            throw new LoadException("Load cluster " + HADOOP_CONFIGS + " is null");
        }
        if (!hadoopConfigs.containsKey(FS_DEFAULT_NAME)) {
            throw new LoadException("Load cluster " + FS_DEFAULT_NAME + " not set");
        }
        if (!hadoopConfigs.containsKey(MAPRED_JOB_TRACKER)) {
            throw new LoadException("Load cluster " + MAPRED_JOB_TRACKER + " not set");
        }
        if (!hadoopConfigs.containsKey(HADOOP_JOB_UGI)) {
            throw new LoadException("Load cluster " + HADOOP_JOB_UGI + " not set");
        }
    }

    public DppConfig getCopiedDppConfig() {
        Map<String, String> copiedHadoopConfigs = null;
        if (hadoopConfigs != null) {
            copiedHadoopConfigs = Maps.newHashMap(hadoopConfigs);
        }

        return new DppConfig(paloPath, httpPort, copiedHadoopConfigs, priority);
    }

    public static String getPaloPathKey() {
        return PALO_PATH;
    }

    public String getPaloPath() {
        return paloPath;
    }

    public String getApplicationsPath() {
        return String.format("%s/%d/%s/%s", paloPath, Env.getCurrentEnv().getClusterId(), APPLICATIONS_PATH,
                FeConstants.dpp_version);
    }

    public String getOutputPath() {
        return String.format("%s/%d/%s", paloPath, Env.getCurrentEnv().getClusterId(), OUTPUT_PATH);
    }

    public static String getHttpPortKey() {
        return HTTP_PORT;
    }

    public int getHttpPort() {
        if (httpPort == -1) {
            return DEFAULT_HTTP_PORT;
        } else {
            return httpPort;
        }
    }

    public String getFsDefaultName() {
        return hadoopConfigs.get(FS_DEFAULT_NAME);
    }

    public String getNameNodeHost() {
        // hdfs://host:port
        String fsDefaultName = hadoopConfigs.get(FS_DEFAULT_NAME);
        String[] arr = fsDefaultName.split(COLON_SEPARATOR);
        if (arr.length != 3) {
            return null;
        }

        return arr[1].substring(2);
    }

    public String getHadoopJobUgiStr() {
        return hadoopConfigs.get(HADOOP_JOB_UGI);
    }

    public static String getHadoopConfigsKey() {
        return HADOOP_CONFIGS;
    }

    public Map<String, String> getHadoopConfigs() {
        return hadoopConfigs;
    }

    public static String getPriorityKey() {
        return PRIORITY;
    }

    public TPriority getPriority() {
        if (priority == null) {
            return TPriority.NORMAL;
        }

        return priority;
    }

    @Override
    public String toString() {
        return "DppConfig{paloPath=" + paloPath + ", httpPort=" + httpPort + ", hadoopConfigs=" + hadoopConfigs + "}";
    }

    @Override
    public void write(DataOutput out) throws IOException {
        if (paloPath != null) {
            out.writeBoolean(true);
            Text.writeString(out, paloPath);
        } else {
            out.writeBoolean(false);
        }

        out.writeInt(httpPort);

        if (hadoopConfigs != null) {
            out.writeBoolean(true);
            out.writeInt(hadoopConfigs.size());
            for (Map.Entry<String, String> entry : hadoopConfigs.entrySet()) {
                Text.writeString(out, entry.getKey());
                Text.writeString(out, entry.getValue());
            }
        } else {
            out.writeBoolean(false);
        }

        if (priority == null) {
            priority = TPriority.NORMAL;
        }
        Text.writeString(out, priority.name());
    }

    public void readFields(DataInput in) throws IOException {
        boolean readPaloPath = false;
        if (in.readBoolean()) {
            readPaloPath = true;
        }
        if (readPaloPath) {
            paloPath = Text.readString(in);
        }

        httpPort = in.readInt();

        boolean readHadoopConfigs = false;
        if (in.readBoolean()) {
            readHadoopConfigs = true;
        }
        if (readHadoopConfigs) {
            hadoopConfigs = Maps.newHashMap();
            int count = in.readInt();
            for (int i = 0; i < count; ++i) {
                hadoopConfigs.put(Text.readString(in), Text.readString(in));
            }
        }

        this.priority = TPriority.valueOf(Text.readString(in));
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }

        if (!(obj instanceof DppConfig)) {
            return false;
        }

        DppConfig other = (DppConfig) obj;
        if (paloPath == null) {
            if (other.paloPath != null) {
                return false;
            }
        } else {
            if (!paloPath.equals(other.paloPath)) {
                return false;
            }
        }

        if (httpPort != other.httpPort) {
            return false;
        }

        if (hadoopConfigs == null) {
            if (other.hadoopConfigs != null) {
                return false;
            }
        } else {
            if (!hadoopConfigs.equals(other.hadoopConfigs)) {
                return false;
            }
        }

        if (priority == null) {
            if (other.priority != null) {
                return false;
            }
        } else {
            if (!priority.equals(other.priority)) {
                return false;
            }
        }

        return true;
    }
}