BDBHA.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.ha;
import org.apache.doris.catalog.Env;
import org.apache.doris.journal.bdbje.BDBEnvironment;
import com.google.common.collect.Lists;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.MemberNotFoundException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationGroup;
import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
public class BDBHA implements HAProtocol {
    private static final Logger LOG = LogManager.getLogger(BDBHA.class);
    private final BDBEnvironment environment;
    private final String nodeName;
    private static final int RETRY_TIME = 3;
    // Unstable node is a follower node that is joining the cluster but have not
    // completed.
    // We should record this kind node and set the bdb electable group size to
    // (size_of_all_followers - size_of_unstable_nodes).
    // Because once the handshake is successful, the joined node is put into the
    // optional group,
    // but it may take a little time for this node to replicate the historical data.
    // This node will never respond to a new data replication until the historical
    // replication is completed,
    // and if the master cannot receive a quorum response, the write operation will
    // fail.
    private final Set<String> unReadyElectableNodes = new HashSet<>();
    public BDBHA(BDBEnvironment env, String nodeName) {
        this.environment = env;
        this.nodeName = nodeName;
    }
    @Override
    public boolean fencing() {
        Database epochDb = environment.getEpochDB();
        for (int i = 0; i < RETRY_TIME; i++) {
            try {
                long count = epochDb.count();
                long myEpoch = count + 1;
                LOG.info("start fencing, epoch number is {}", myEpoch);
                Long key = myEpoch;
                DatabaseEntry theKey = new DatabaseEntry();
                TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
                idBinding.objectToEntry(key, theKey);
                DatabaseEntry theData = new DatabaseEntry(new byte[1]);
                OperationStatus status = epochDb.putNoOverwrite(null, theKey, theData);
                if (status == OperationStatus.SUCCESS) {
                    Env.getCurrentEnv().setEpoch(myEpoch);
                    return true;
                } else if (status == OperationStatus.KEYEXIST) {
                    return false;
                } else {
                    throw new Exception(status.toString());
                }
            } catch (Exception e) {
                LOG.warn("fencing failed. tried {} times", i, e);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    LOG.warn("fencing sleep exception:", e1);
                }
            }
        }
        return false;
    }
    @Override
    public List<InetSocketAddress> getObserverNodes() {
        List<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        if (replicationGroupAdmin == null) {
            return ret;
        }
        try {
            ReplicationGroup replicationGroup = replicationGroupAdmin.getGroup();
            for (ReplicationNode replicationNode : replicationGroup.getSecondaryNodes()) {
                ret.add(replicationNode.getSocketAddress());
            }
        } catch (UnknownMasterException e) {
            LOG.warn("Catch UnknownMasterException when calling getObserverNodes.", e);
            return Lists.newArrayList();
        }
        return ret;
    }
    @Override
    public List<InetSocketAddress> getElectableNodes(boolean leaderIncluded) {
        List<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        if (replicationGroupAdmin == null) {
            return ret;
        }
        try {
            ReplicationGroup replicationGroup = replicationGroupAdmin.getGroup();
            for (ReplicationNode replicationNode : replicationGroup.getElectableNodes()) {
                if (leaderIncluded) {
                    ret.add(replicationNode.getSocketAddress());
                } else {
                    if (!replicationNode.getName()
                            .equals(replicationGroupAdmin.getMasterNodeName())) {
                        ret.add(replicationNode.getSocketAddress());
                    }
                }
            }
        } catch (UnknownMasterException e) {
            LOG.warn("Catch UnknownMasterException when calling getElectableNodes.", e);
            return Lists.newArrayList();
        }
        return ret;
    }
    @Override
    public InetSocketAddress getLeader() {
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        String leaderName = replicationGroupAdmin.getMasterNodeName();
        ReplicationGroup rg = replicationGroupAdmin.getGroup();
        ReplicationNode rn = rg.getMember(leaderName);
        return rn.getSocketAddress();
    }
    @Override
    public boolean removeElectableNode(String nodeName) {
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        if (replicationGroupAdmin == null) {
            return false;
        }
        try {
            LOG.info("remove electable node: {}", nodeName);
            replicationGroupAdmin.removeMember(nodeName);
        } catch (MemberNotFoundException e) {
            LOG.warn("the electable node is not found {}", nodeName, e);
            return false;
        } catch (Exception e) {
            LOG.error("remove electable node {} meeting unkown exception:", nodeName, e);
            System.exit(-1);
        }
        return true;
    }
    public boolean updateNodeAddress(String nodeName, String newHostName, int port) {
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        if (replicationGroupAdmin == null) {
            return false;
        }
        try {
            LOG.info("update electable node {} with new host name: {}, port: {}", nodeName, newHostName, port);
            replicationGroupAdmin.updateAddress(nodeName, newHostName, port);
        } catch (MemberNotFoundException e) {
            LOG.error("the updating electable node is not found {}", nodeName, e);
            return false;
        } catch (MasterStateException e) {
            LOG.error("the updating electable node is master {}", nodeName, e);
            return false;
        }
        return true;
    }
    public void removeConflictNodeIfExist(String host, int port) {
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        if (replicationGroupAdmin == null) {
            return;
        }
        List<String> conflictNodes = Lists.newArrayList();
        Set<ReplicationNode> replicationNodes = replicationGroupAdmin.getGroup().getElectableNodes();
        for (ReplicationNode replicationNode : replicationNodes) {
            if (replicationNode.getHostName().equals(host) && replicationNode.getPort() == port) {
                conflictNodes.add(replicationNode.getName());
            }
        }
        for (String conflictNode : conflictNodes) {
            removeElectableNode(conflictNode);
        }
    }
    public synchronized void addUnReadyElectableNode(String nodeName, int totalFollowerCount) {
        unReadyElectableNodes.add(nodeName);
        ReplicatedEnvironment replicatedEnvironment = environment.getReplicatedEnvironment();
        if (replicatedEnvironment != null) {
            int override = totalFollowerCount - unReadyElectableNodes.size();
            LOG.info("set electable group size override to {}, total follower count: {}, add unready node: {}",
                    override, totalFollowerCount, nodeName);
            replicatedEnvironment.setRepMutableConfig(new ReplicationMutableConfig()
                    .setElectableGroupSizeOverride(override));
        }
    }
    public synchronized void removeUnReadyElectableNode(String nodeName, int totalFollowerCount) {
        unReadyElectableNodes.remove(nodeName);
        ReplicatedEnvironment replicatedEnvironment = environment.getReplicatedEnvironment();
        if (replicatedEnvironment != null) {
            if (unReadyElectableNodes.isEmpty()) {
                // Setting ElectableGroupSizeOverride to 0 means remove this config,
                // and bdb will use the normal electable group size.
                LOG.info("remove electable group size override, total follower count: {}, remove unready node: {}",
                        totalFollowerCount, nodeName);
                replicatedEnvironment.setRepMutableConfig(
                        new ReplicationMutableConfig().setElectableGroupSizeOverride(0));
            } else {
                int override = totalFollowerCount - unReadyElectableNodes.size();
                LOG.info("set electable group size override to {}, total follower count: {}, remove unready node: {}",
                        override, totalFollowerCount, nodeName);
                replicatedEnvironment.setRepMutableConfig(new ReplicationMutableConfig()
                        .setElectableGroupSizeOverride(override));
            }
        }
    }
    public void removeDroppedMember(ConcurrentLinkedQueue<String> removedFrontends) {
        ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
        if (replicationGroupAdmin == null) {
            return;
        }
        Set<ReplicationNode> replicationNodes = replicationGroupAdmin.getGroup().getElectableNodes();
        LOG.debug("removedFrontends:{}", removedFrontends);
        for (ReplicationNode replicationNode : replicationNodes) {
            LOG.debug("node:{}", replicationNode.toString());
            if (removedFrontends.contains(replicationNode.getName())) {
                try {
                    replicationGroupAdmin.removeMember(replicationNode.getName());
                } catch (MemberNotFoundException e) {
                    LOG.warn("the electable node is not found {}", replicationNode.getName());
                } catch (Exception e) {
                    LOG.error("remove electable node {} meeting unknown exception:", replicationNode.getName(), e);
                    System.exit(-1);
                }
            }
        }
    }
}