BDBEnvironment.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.journal.bdbje;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.BDBStateChangeListener;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.ha.HAProtocol;
import org.apache.doris.system.Frontend;

import com.google.common.collect.ImmutableList;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import com.sleepycat.je.rep.NoConsistencyRequiredPolicy;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.RepInternal;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.RollbackException;
import com.sleepycat.je.rep.StateChangeListener;
import com.sleepycat.je.rep.util.DbResetRepGroup;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.stream.Collectors;

/* this class contains the reference to bdb environment.
 * including all the opened databases and the replicationGroupAdmin.
 * we can get the information of this bdb group through the API of replicationGroupAdmin
 */
public class BDBEnvironment {
    private static final Logger LOG = LogManager.getLogger(BDBEnvironment.class);
    private static final int RETRY_TIME = 3;
    private static final List<String> BDBJE_LOG_LEVEL = ImmutableList.of("OFF", "SEVERE", "WARNING",
            "INFO", "CONFIG", "FINE", "FINER", "FINEST", "ALL");
    public static final String PALO_JOURNAL_GROUP = "PALO_JOURNAL_GROUP";

    private ReplicatedEnvironment replicatedEnvironment;
    private EnvironmentConfig environmentConfig;
    private ReplicationConfig replicationConfig;
    private DatabaseConfig dbConfig;
    private Database epochDB = null;  // used for fencing
    private ReentrantReadWriteLock lock;
    private List<Database> openedDatabases;

    private final boolean isElectable;
    private final boolean metadataFailureRecovery;

    public BDBEnvironment(boolean isElectable, boolean metadataFailureRecovery) {
        openedDatabases = new ArrayList<Database>();
        this.lock = new ReentrantReadWriteLock(true);
        this.isElectable = isElectable;
        this.metadataFailureRecovery = metadataFailureRecovery;
    }

    // The setup() method opens the environment and database
    public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
                      String helperHostPort) {
        // Almost never used, just in case the master can not restart
        if (metadataFailureRecovery || Config.enable_check_compatibility_mode) {
            if (!isElectable && !Config.enable_check_compatibility_mode) {
                LOG.error("Current node is not in the electable_nodes list. will exit");
                System.exit(-1);
            }
            LOG.warn("start group reset");
            DbResetRepGroup resetUtility = new DbResetRepGroup(
                    envHome, PALO_JOURNAL_GROUP, selfNodeName, selfNodeHostPort);
            resetUtility.reset();
            LOG.warn("metadata recovery mode, group has been reset.");
        }

        // set replication config
        replicationConfig = new ReplicationConfig();
        replicationConfig.setNodeName(selfNodeName);
        replicationConfig.setNodeHostPort(selfNodeHostPort);
        replicationConfig.setHelperHosts(helperHostPort);
        replicationConfig.setGroupName(PALO_JOURNAL_GROUP);
        replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10 s");
        replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS);
        replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
                String.valueOf(Config.txn_rollback_limit));
        replicationConfig.setConfigParam(ReplicationConfig.REPLICA_TIMEOUT,
                Config.bdbje_heartbeat_timeout_second + " s");
        replicationConfig.setConfigParam(ReplicationConfig.FEEDER_TIMEOUT,
                Config.bdbje_heartbeat_timeout_second + " s");

        if (isElectable) {
            replicationConfig.setReplicaAckTimeout(Config.bdbje_replica_ack_timeout_second, TimeUnit.SECONDS);
            replicationConfig.setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT, "0");
            replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
        } else {
            replicationConfig.setNodeType(NodeType.SECONDARY);
            replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
        }
        replicationConfig.setConfigParam(ReplicationConfig.MAX_MESSAGE_SIZE,
                                         String.valueOf(Config.bdbje_max_message_size_bytes));

        // set environment config
        environmentConfig = new EnvironmentConfig();
        environmentConfig.setTransactional(true);
        environmentConfig.setAllowCreate(true);
        environmentConfig.setLockTimeout(Config.bdbje_lock_timeout_second, TimeUnit.SECONDS);
        environmentConfig.setConfigParam(EnvironmentConfig.RESERVED_DISK,
                String.valueOf(Config.bdbje_reserved_disk_bytes));
        environmentConfig.setConfigParam(EnvironmentConfig.FREE_DISK,
                String.valueOf(Config.bdbje_free_disk_bytes));
        environmentConfig.setCacheSize(Config.bdbje_cache_size_bytes);

        if (Config.ignore_bdbje_log_checksum_read) {
            environmentConfig.setConfigParam(EnvironmentConfig.LOG_CHECKSUM_READ, "false");
            LOG.warn("set EnvironmentConfig.LOG_CHECKSUM_READ false");
        }

        if (BDBJE_LOG_LEVEL.contains(Config.bdbje_file_logging_level)) {
            java.util.logging.Logger parent = java.util.logging.Logger.getLogger("com.sleepycat.je");
            parent.setLevel(Level.parse(Config.bdbje_file_logging_level));
            environmentConfig.setConfigParam(EnvironmentConfig.FILE_LOGGING_LEVEL, Config.bdbje_file_logging_level);
        } else {
            LOG.warn("bdbje_file_logging_level invalid value: {}, will not take effort, use default",
                    Config.bdbje_file_logging_level);
        }

        if (isElectable) {
            Durability durability = new Durability(getSyncPolicy(Config.master_sync_policy),
                    getSyncPolicy(Config.replica_sync_policy), getAckPolicy(Config.replica_ack_policy));
            environmentConfig.setDurability(durability);
        }

        // set database config
        dbConfig = new DatabaseConfig();
        dbConfig.setTransactional(true);
        if (isElectable) {
            dbConfig.setAllowCreate(true);
            dbConfig.setReadOnly(false);
        } else {
            dbConfig.setAllowCreate(false);
            dbConfig.setReadOnly(true);
        }

        // open environment and epochDB
        for (int i = 0; i < RETRY_TIME; i++) {
            try {
                if (replicatedEnvironment != null) {
                    this.close();
                }
                // open the environment
                replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);

                // get a BDBHA object and pass the reference to Catalog
                HAProtocol protocol = new BDBHA(this, selfNodeName);
                Env.getCurrentEnv().setHaProtocol(protocol);

                // start state change listener
                StateChangeListener listener = new BDBStateChangeListener(isElectable);
                replicatedEnvironment.setStateChangeListener(listener);
                // open epochDB. the first parameter null means auto-commit
                epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
                break;
            } catch (InsufficientLogException insufficientLogEx) {
                LOG.info("i:{} insufficientLogEx:", i, insufficientLogEx);
                NetworkRestore restore = new NetworkRestore();
                NetworkRestoreConfig config = new NetworkRestoreConfig();
                config.setRetainLogFiles(false); // delete obsolete log files.
                // Use the members returned by insufficientLogEx.getLogProviders()
                // to select the desired subset of members and pass the resulting
                // list as the argument to config.setLogProviders(), if the
                // default selection of providers is not suitable.
                restore.execute(insufficientLogEx, config);
            } catch (DatabaseException e) {
                LOG.info("i:{} exception:", i, e);
                if (i < RETRY_TIME - 1) {
                    try {
                        Thread.sleep(5 * 1000);
                    } catch (InterruptedException e1) {
                        LOG.warn("", e1);
                    }
                } else {
                    LOG.error("error to open replicated environment. will exit.", e);
                    System.exit(-1);
                }
            }
        }
    }

    public ReplicationGroupAdmin getReplicationGroupAdmin() {
        Set<InetSocketAddress> addresses = Env.getCurrentEnv()
                .getFrontends(FrontendNodeType.FOLLOWER)
                .stream()
                .filter(Frontend::isAlive)
                .map(fe -> new InetSocketAddress(fe.getHost(), fe.getEditLogPort()))
                .collect(Collectors.toSet());

        if (addresses.isEmpty()) {
            LOG.info("addresses is empty");
            return null;
        }
        return new ReplicationGroupAdmin(PALO_JOURNAL_GROUP, addresses);
    }

    // Return a handle to the epochDB
    public Database getEpochDB() {
        return epochDB;
    }

    // Return a handle to the environment
    public ReplicatedEnvironment getReplicatedEnvironment() {
        return replicatedEnvironment;
    }

    // return the database reference with the given name
    // also try to close previous opened database.
    public Database openDatabase(String dbName) {
        Database db = null;
        lock.writeLock().lock();
        try {
            // find if the specified database is already opened. find and return it.
            for (java.util.Iterator<Database> iter = openedDatabases.iterator(); iter.hasNext();) {
                Database openedDb = iter.next();
                try {
                    if (openedDb.getDatabaseName() == null) {
                        openedDb.close();
                        iter.remove();
                        continue;
                    }
                } catch (Exception e) {
                    /*
                     * In the case when 3 FE (1 master and 2 followers) start at same time,
                     * We may catch com.sleepycat.je.rep.DatabasePreemptedException which said that
                     * "Database xx has been forcibly closed in order to apply a replicated remove operation."
                     *
                     * Because when Master FE finished to save image, it try to remove old journals,
                     * and also remove the databases these old journals belongs to.
                     * So after Master removed the database from replicatedEnvironment,
                     * call db.getDatabaseName() will throw DatabasePreemptedException,
                     * because it has already been destroyed.
                     *
                     * The reason why Master can safely remove a database is because it knows that all
                     * non-master FE have already load the journal ahead of this database. So remove the
                     * database is safe.
                     *
                     * Here we just try to close the useless database(which may be removed by Master),
                     * so even we catch the exception, just ignore it is OK.
                     */
                    LOG.warn("get exception when try to close previously opened bdb database. ignore it", e);
                    iter.remove();
                    continue;
                }

                if (openedDb.getDatabaseName().equals(dbName)) {
                    return openedDb;
                }
            }

            // open the specified database.
            // the first parameter null means auto-commit
            try {
                db = replicatedEnvironment.openDatabase(null, dbName, dbConfig);
                openedDatabases.add(db);
            } catch (Exception e) {
                LOG.warn("catch an exception when open database {}", dbName, e);
            }
        } finally {
            lock.writeLock().unlock();
        }
        return db;
    }

    // close and remove the database whose name is dbName
    public void removeDatabase(String dbName) {
        lock.writeLock().lock();
        try {
            String targetDbName = null;
            int index = 0;
            for (Database db : openedDatabases) {
                String name = db.getDatabaseName();
                if (dbName.equals(name)) {
                    db.close();
                    LOG.info("database {} has been closed", name);
                    targetDbName = name;
                    break;
                }
                index++;
            }
            if (targetDbName != null) {
                LOG.info("begin to remove database {} from openedDatabases", targetDbName);
                openedDatabases.remove(index);
            }
            try {
                LOG.info("begin to remove database {} from replicatedEnvironment", dbName);
                // the first parameter null means auto-commit
                replicatedEnvironment.removeDatabase(null, dbName);
            } catch (DatabaseNotFoundException e) {
                LOG.warn("catch an exception when remove db:{}, this db does not exist", dbName, e);
            }
        } finally {
            lock.writeLock().unlock();
        }
    }

    // get journal db names and sort the names
    public List<Long> getDatabaseNames() {
        // The operation before may set the current thread as interrupted.
        // MUST reset the interrupted flag of current thread to false,
        // otherwise replicatedEnvironment.getDatabaseNames() will fail because it will call lock.tryLock(),
        // which will check the interrupted flag.
        Thread.interrupted();
        List<Long> ret = new ArrayList<Long>();
        List<String> names = null;
        int tried = 0;
        while (true) {
            try {
                names = replicatedEnvironment.getDatabaseNames();
                break;
            } catch (InsufficientLogException e) {
                throw e;
            } catch (RollbackException e) {
                throw e;
            } catch (EnvironmentFailureException e) {
                tried++;
                if (tried == RETRY_TIME) {
                    LOG.error("bdb environment failure exception.", e);
                    System.exit(-1);
                }
                LOG.warn("bdb environment failure exception. will retry", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    LOG.warn("", e1);
                }
            } catch (DatabaseException e) {
                LOG.warn("catch an exception when calling getDatabaseNames", e);
                return null;
            }
        }

        if (names != null) {
            for (String name : names) {
                if (StringUtils.isNumeric(name)) {
                    ret.add(Long.parseLong(name));
                } else {
                    if (LOG.isDebugEnabled()) {
                        // LOG.debug("get database names, skipped {}", name);
                    }
                }
            }
        }

        Collections.sort(ret);
        return ret;
    }

    // Close the store and environment
    public void close() {
        for (Database db : openedDatabases) {
            try {
                db.close();
            } catch (DatabaseException exception) {
                LOG.error("Error closing db {} will exit", db.getDatabaseName(), exception);
            }
        }
        openedDatabases.clear();

        if (epochDB != null) {
            try {
                epochDB.close();
                epochDB = null;
            } catch (DatabaseException exception) {
                LOG.error("Error closing db {} will exit", epochDB.getDatabaseName(), exception);
            }
        }

        if (replicatedEnvironment != null) {
            try {
                // Finally, close the store and environment.
                replicatedEnvironment.close();
                replicatedEnvironment = null;
            } catch (DatabaseException exception) {
                LOG.error("Error closing replicatedEnvironment", exception);
            }
        }
    }

    // open environment
    public void openReplicatedEnvironment(File envHome) {
        for (int i = 0; i < RETRY_TIME; i++) {
            try {
                if (replicatedEnvironment != null) {
                    this.close();
                }
                // open the environment
                replicatedEnvironment =
                        new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);

                // start state change listener
                StateChangeListener listener = new BDBStateChangeListener(isElectable);
                replicatedEnvironment.setStateChangeListener(listener);

                // open epochDB. the first parameter null means auto-commit
                epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
                break;
            } catch (DatabaseException e) {
                LOG.info("i:{} exception:", i, e);
                if (i < RETRY_TIME - 1) {
                    try {
                        Thread.sleep(5 * 1000);
                    } catch (InterruptedException e1) {
                        LOG.warn("", e1);
                    }
                } else {
                    LOG.error("error to open replicated environment. will exit.", e);
                    System.exit(-1);
                }
            }
        }
    }

    private static SyncPolicy getSyncPolicy(String policy) {
        if (policy.equalsIgnoreCase("SYNC")) {
            return Durability.SyncPolicy.SYNC;
        }
        if (policy.equalsIgnoreCase("NO_SYNC")) {
            return Durability.SyncPolicy.NO_SYNC;
        }
        // default value is WRITE_NO_SYNC
        return Durability.SyncPolicy.WRITE_NO_SYNC;
    }

    private static ReplicaAckPolicy getAckPolicy(String policy) {
        if (policy.equalsIgnoreCase("ALL")) {
            return Durability.ReplicaAckPolicy.ALL;
        }
        if (policy.equalsIgnoreCase("NONE")) {
            return Durability.ReplicaAckPolicy.NONE;
        }
        // default value is SIMPLE_MAJORITY
        return Durability.ReplicaAckPolicy.SIMPLE_MAJORITY;
    }

    public String getNotReadyReason() {
        if (replicatedEnvironment == null) {
            LOG.warn("replicatedEnvironment is null");
            return "replicatedEnvironment is null";
        }
        try {
            if (replicatedEnvironment.getInvalidatingException() != null) {
                return replicatedEnvironment.getInvalidatingException().getMessage();
            }

            if (RepInternal.getNonNullRepImpl(replicatedEnvironment).getDiskLimitViolation() != null) {
                return RepInternal.getNonNullRepImpl(replicatedEnvironment).getDiskLimitViolation();
            }
        } catch (Exception e) {
            LOG.warn("getNotReadyReason exception:", e);
        }
        return "";
    }

}