LoadErrorHub.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;

public abstract class LoadErrorHub {

    public static class MysqlParam implements Writable {
        @SerializedName("h")
        private String host;
        @SerializedName("p")
        private int port;
        @SerializedName("u")
        private String user;
        @SerializedName("pwd")
        private String passwd;
        @SerializedName("db")
        private String db;
        @SerializedName("tb")
        private String table;

        public MysqlParam() {
            host = "";
            port = 0;
            user = "";
            passwd = "";
            db = "";
            table = "";
        }

        public MysqlParam(String host, int port, String user, String passwd, String db, String table) {
            this.host = host;
            this.port = port;
            this.user = user;
            this.passwd = passwd;
            this.db = db;
            this.table = table;
        }

        public String getBrief() {
            Map<String, String> briefMap = Maps.newHashMap();
            briefMap.put("host", host);
            briefMap.put("port", String.valueOf(port));
            briefMap.put("user", user);
            briefMap.put("password", passwd);
            briefMap.put("database", db);
            briefMap.put("table", table);
            PrintableMap<String, String> printableMap = new PrintableMap<>(briefMap, "=", true, false, true);
            return printableMap.toString();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, host);
            out.writeInt(port);
            Text.writeString(out, user);
            Text.writeString(out, passwd);
            Text.writeString(out, db);
            Text.writeString(out, table);
        }

        public void readFields(DataInput in) throws IOException {
            host = Text.readString(in);
            port = in.readInt();
            user = Text.readString(in);
            passwd = Text.readString(in);
            db = Text.readString(in);
            table = Text.readString(in);
        }
    }

    public static class BrokerParam implements Writable {
        @SerializedName("b")
        private String brokerName;
        @SerializedName("pa")
        private String path;
        @SerializedName("pr")
        private Map<String, String> prop = Maps.newHashMap();

        // for persist
        public BrokerParam() {
        }

        public BrokerParam(String brokerName, String path, Map<String, String> prop) {
            this.brokerName = brokerName;
            this.path = path;
            this.prop = prop;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, brokerName);
            Text.writeString(out, path);
            out.writeInt(prop.size());
            for (Map.Entry<String, String> entry : prop.entrySet()) {
                Text.writeString(out, entry.getKey());
                Text.writeString(out, entry.getValue());
            }
        }

        public void readFields(DataInput in) throws IOException {
            brokerName = Text.readString(in);
            path = Text.readString(in);
            int size = in.readInt();
            for (int i = 0; i < size; i++) {
                String key = Text.readString(in);
                String val = Text.readString(in);
                prop.put(key, val);
            }
        }
    }

    public static final String MYSQL_PROTOCOL = "MYSQL";
    public static final String BROKER_PROTOCOL = "BROKER";

    public static enum HubType {
        MYSQL_TYPE,
        BROKER_TYPE,
        NULL_TYPE
    }

    public static class Param implements Writable {
        @SerializedName(value = "t")
        private HubType type;
        @SerializedName(value = "m")
        private MysqlParam mysqlParam;
        @SerializedName(value = "b")
        private BrokerParam brokerParam;

        // for replay
        public Param() {
            type = HubType.NULL_TYPE;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, GsonUtils.GSON.toJson(this));
        }

        public static Param read(DataInput in) throws IOException {
            if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_134) {
                Param param = new Param();
                param.readFields(in);
                return param;
            } else {
                return GsonUtils.GSON.fromJson(Text.readString(in), Param.class);
            }
        }

        @Deprecated
        public void readFields(DataInput in) throws IOException {
            type = HubType.valueOf(Text.readString(in));
            switch (type) {
                case MYSQL_TYPE:
                    mysqlParam = new MysqlParam();
                    mysqlParam.readFields(in);
                    break;
                case BROKER_TYPE:
                    brokerParam = new BrokerParam();
                    brokerParam.readFields(in);
                    break;
                case NULL_TYPE:
                    break;
                default:
                    Preconditions.checkState(false, "unknown hub type");
            }
        }
    }
}