BDBJEJournal.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.OperationType;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import com.sleepycat.je.rep.ReplicaConsistencyException;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.RollbackException;
import com.sleepycat.je.rep.TimeConsistencyPolicy;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/*
* This is the bdb implementation of Journal interface.
* First, we open() this journal, then read from or write to the bdb environment
* We can also get journal id information by calling get***Id functions.
* Finally, close this journal.
* This class encapsulates the read, write APIs of bdbje
*/
public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use uppercase
public static final Logger LOG = LogManager.getLogger(BDBJEJournal.class);
private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
private static final int RETRY_TIME = 3;
private String environmentPath = null;
private String selfNodeName;
private String selfNodeHostPort;
private BDBEnvironment bdbEnvironment = null;
private Database currentJournalDB;
// the next journal's id. start from 1.
private AtomicLong nextJournalId = new AtomicLong(1);
public BDBJEJournal(String nodeName) {
environmentPath = Env.getServingEnv().getBdbDir();
HostInfo selfNode = Env.getServingEnv().getSelfNode();
selfNodeName = nodeName;
// We use the hostname as the address of the bdbje node,
// so that we do not need to update bdbje when the IP changes.
// WARNING:However, it is necessary to ensure that the hostname of the node
// can be resolved and accessed by other nodes.
selfNodeHostPort = NetUtils.getHostPortInAccessibleFormat(selfNode.getHost(), selfNode.getPort());
}
/*
* Database is named by its minimum journal id.
* For example:
* One database contains journal 100 to journal 200, its name is 100.
* The next database's name is 201
*/
@Override
public synchronized void rollJournal() {
// Doesn't need to roll if current database contains no journals
if (currentJournalDB.count() == 0) {
return;
}
long newName = nextJournalId.get();
String currentDbName = currentJournalDB.getDatabaseName();
long currentName = Long.parseLong(currentDbName);
long newNameVerify = currentName + currentJournalDB.count();
if (newName == newNameVerify) {
LOG.info("roll edit log. new dbName: {}, old dbName:{}", newName, currentDbName);
currentJournalDB = bdbEnvironment.openDatabase(Long.toString(newName));
} else {
String msg = String.format("roll journal error! journalId and db journal numbers is not match. "
+ "journal id: %d, current db: %s, expected db count: %d",
newName, currentDbName, newNameVerify);
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
}
}
@Override
public synchronized long write(JournalBatch batch) throws IOException {
List<JournalBatch.Entity> entities = batch.getJournalEntities();
int entitySize = entities.size();
long dataSize = 0;
long firstId = nextJournalId.getAndAdd(entitySize);
// Write the journals to bdb.
for (int i = 0; i < RETRY_TIME; i++) {
Transaction txn = null;
StopWatch watch = StopWatch.createStarted();
try {
// The default config is constructed from the configs of environment.
txn = bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, null);
dataSize = 0;
for (int j = 0; j < entitySize; ++j) {
JournalBatch.Entity entity = entities.get(j);
DatabaseEntry theKey = idToKey(firstId + j);
DatabaseEntry theData = new DatabaseEntry(entity.getBinaryData());
currentJournalDB.put(txn, theKey, theData); // Put with overwrite, it always success
dataSize += theData.getSize();
if (i == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("opCode = {}, journal size = {}", entity.getOpCode(), theData.getSize());
}
}
}
txn.commit();
txn = null;
if (MetricRepo.isInit) {
MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase(dataSize);
MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase(dataSize);
MetricRepo.HISTO_JOURNAL_BATCH_SIZE.update(entitySize);
MetricRepo.HISTO_JOURNAL_BATCH_DATA_SIZE.update(dataSize);
}
if (entitySize > Config.batch_edit_log_max_item_num) {
LOG.warn("write bdb journal batch is too large, batch size {}, the first journal id {}, "
+ "data size {}", entitySize, firstId, dataSize);
}
if (dataSize > Config.batch_edit_log_max_byte_size) { // 640KB
LOG.warn("write bdb journal batch data is too large, data size {}, the first journal id {}, "
+ "batch size {}", dataSize, firstId, entitySize);
}
return firstId;
} catch (ReplicaWriteException e) {
/**
* This exception indicates that an update operation or transaction commit
* or abort was attempted while in the
* {@link ReplicatedEnvironment.State#REPLICA} state. The transaction is marked
* as being invalid.
* <p>
* The exception is the result of either an error in the application logic or
* the result of a transition of the node from Master to Replica while a
* transaction was in progress.
* <p>
* The application must abort the current transaction and redirect all
* subsequent update operations to the Master.
*/
LOG.error("catch ReplicaWriteException when writing to database, will exit. the first journal id {}",
firstId, e);
String msg = "write bdb failed. will exit. the first journalId: " + firstId + ", bdb database Name: "
+ currentJournalDB.getDatabaseName();
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
} catch (DatabaseException e) {
LOG.error("catch an exception when writing to database. sleep and retry. the first journal id {}",
firstId, e);
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e1) {
LOG.warn("", e1);
}
} finally {
if (txn != null) {
txn.abort();
}
watch.stop();
if (watch.getTime() > 100000) { // 100ms
LOG.warn("write bdb is too slow, cost {}ms, the first journal id, batch size {}, data size{}",
watch.getTime(), firstId, entitySize, dataSize);
}
}
}
String msg = "write bdb failed. will exit. the first journalId: " + firstId + ", bdb database Name: "
+ currentJournalDB.getDatabaseName();
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
return 0; // unreachable!
}
@Override
public synchronized long write(short op, Writable writable) throws IOException {
// The operation before may set the current thread as interrupted.
// MUST reset the interrupted flag of current thread to false,
// otherwise edit log writing may fail because it will call lock.tryLock(),
// which will check the interrupted flag.
Thread.interrupted();
JournalEntity entity = new JournalEntity();
entity.setOpCode(op);
entity.setData(writable);
// id is the key
long id = nextJournalId.getAndIncrement();
DatabaseEntry theKey = idToKey(id);
// entity is the value
DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
entity.write(buffer);
DatabaseEntry theData = new DatabaseEntry(buffer.getData());
if (MetricRepo.isInit) {
MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase((long) theData.getSize());
}
if (LOG.isDebugEnabled() || theData.getSize() > (1 << 20)) {
LOG.info("opCode = {}, journal size = {}", op, theData.getSize());
if (MetricRepo.isInit) {
MetricRepo.COUNTER_LARGE_EDIT_LOG.increase(1L);
}
}
// Write the key value pair to bdb.
boolean writeSucceed = false;
// ATTN: If all the followers exit except master, master should continue provide
// query service, so do not exit if the write operation is OP_TIMESTAMP.
//
// Because BDBJE will replicate the committed txns to FOLLOWERs after the connection
// resumed, directly reseting the next journal id and returning will cause subsequent
// txn written to the same journal ID not to be replayed by the FOLLOWERS. So for
// OP_TIMESTAMP operation, try to write until it succeeds here.
int retryTimes = op == OperationType.OP_TIMESTAMP ? Integer.MAX_VALUE : RETRY_TIME;
for (int i = 0; i < retryTimes; i++) {
try {
// Parameter null means auto commit
if (currentJournalDB.put(null, theKey, theData) == OperationStatus.SUCCESS) {
writeSucceed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("master write journal {} finished. db name {}, current time {}",
id, currentJournalDB.getDatabaseName(), System.currentTimeMillis());
}
break;
}
} catch (ReplicaWriteException e) {
/**
* This exception indicates that an update operation or transaction commit
* or abort was attempted while in the
* {@link ReplicatedEnvironment.State#REPLICA} state. The transaction is marked
* as being invalid.
* <p>
* The exception is the result of either an error in the application logic or
* the result of a transition of the node from Master to Replica while a
* transaction was in progress.
* <p>
* The application must abort the current transaction and redirect all
* subsequent update operations to the Master.
*/
LOG.error("catch ReplicaWriteException when writing to database, will exit. journal id {}", id, e);
String msg = "write bdb failed. will exit. journalId: " + id + ", bdb database Name: "
+ currentJournalDB.getDatabaseName();
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
} catch (DatabaseException e) {
LOG.error("catch an exception when writing to database. sleep and retry. journal id {}", id, e);
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e1) {
LOG.warn("", e1);
}
}
}
if (!writeSucceed) {
String msg = "write bdb failed. will exit. journalId: " + id + ", bdb database Name: "
+ currentJournalDB.getDatabaseName();
LOG.error(msg);
LogUtils.stderr(msg);
System.exit(-1);
}
return id;
}
private static DatabaseEntry idToKey(Long id) {
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
idBinding.objectToEntry(id, theKey);
return theKey;
}
@Override
public JournalEntity read(long journalId) {
List<Long> dbNames = getDatabaseNames();
if (dbNames == null) {
return null;
}
String dbName = null;
for (long db : dbNames) {
if (journalId >= db) {
dbName = Long.toString(db);
continue;
} else {
break;
}
}
if (dbName == null) {
return null;
}
JournalEntity ret = null;
Long key = journalId;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> myBinding = TupleBinding.getPrimitiveBinding(Long.class);
myBinding.objectToEntry(key, theKey);
DatabaseEntry theData = new DatabaseEntry();
Database database = bdbEnvironment.openDatabase(dbName);
try {
// null means perform the operation without transaction protection.
// READ_COMMITTED guarantees no dirty read.
if (database.get(null, theKey, theData, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS) {
// Recreate the data String.
byte[] retData = theData.getData();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(retData));
ret = new JournalEntity();
try {
ret.readFields(in);
} catch (IOException e) {
LOG.warn("", e);
}
} else {
LOG.warn("No record found for key '{}'.", journalId);
}
} catch (Exception e) {
LOG.warn("catch an exception when get JournalEntity. key:{}", journalId, e);
return null;
}
return ret;
}
@Override
public JournalCursor read(long fromKey, long toKey) {
return BDBJournalCursor.getJournalCursor(bdbEnvironment, fromKey, toKey);
}
@Override
public long getMaxJournalId() {
if (Config.enable_check_compatibility_mode) {
return getMaxJournalIdWithoutCheck();
}
return getMaxJournalIdInternal(true);
}
// get max journal id but do not check whether the txn is matched.
private long getMaxJournalIdWithoutCheck() {
return getMaxJournalIdInternal(false);
}
private long getMaxJournalIdInternal(boolean checkTxnMatched) {
long ret = -1;
if (bdbEnvironment == null) {
return ret;
}
List<Long> dbNames = getDatabaseNames();
if (dbNames == null) {
return ret;
}
if (dbNames.isEmpty()) {
return ret;
}
int index = dbNames.size() - 1;
String dbName = dbNames.get(index).toString();
long dbNumberName = dbNames.get(index);
Database database = bdbEnvironment.openDatabase(dbName);
if (checkTxnMatched && !isReplicaTxnAreMatched(database, dbNumberName)) {
LOG.warn("The current replica hasn't synced up with the master, current db name: {}", dbNumberName);
if (index != 0) {
// Because roll journal occurs after write, the previous write must have
// been replicated to the majority, so it can be guaranteed that the database
// will not be rollback.
return dbNumberName - 1;
}
return -1;
}
return dbNumberName + database.count() - 1;
}
// Whether the replica txns are matched with the master.
//
// BDBJE could throw InsufficientAcksException during post commit, at that time the
// log has persisted in disk. When the replica is restarted, we need to ensure that
// before replaying the journals, sync up txns with the new master in the cluster and
// rollback the txns that have been persisted but have not committed to the majority.
//
// See org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for details.
private boolean isReplicaTxnAreMatched(Database database, Long id) {
// The time lag is set to Integer.MAX_VALUE if the replica haven't synced up
// with the master. By allowing a very large lag, we can detect whether the
// replica has synced up with the master.
TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy(
1, TimeUnit.DAYS, 1, TimeUnit.MINUTES);
Transaction txn = null;
try {
TransactionConfig cfg = new TransactionConfig()
.setReadOnly(true)
.setReadCommitted(true)
.setConsistencyPolicy(consistencyPolicy);
txn = bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg);
DatabaseEntry key = idToKey(id);
database.get(txn, key, null, LockMode.READ_COMMITTED);
return true;
} catch (ReplicaConsistencyException e) {
return false;
} finally {
if (txn != null) {
txn.abort();
}
}
}
@Override
public long getMinJournalId() {
long ret = -1;
if (bdbEnvironment == null) {
return ret;
}
List<Long> dbNames = getDatabaseNames();
if (dbNames == null) {
return ret;
}
if (dbNames.isEmpty()) {
return ret;
}
String dbName = dbNames.get(0).toString();
Database database = bdbEnvironment.openDatabase(dbName);
// The database is empty
if (database.count() == 0) {
return ret;
}
return dbNames.get(0);
}
@Override
public void close() {
bdbEnvironment.close();
bdbEnvironment = null;
}
/*
* open the bdbje environment, and get the current journal database
*/
@Override
public synchronized void open() {
if (bdbEnvironment == null) {
File dbEnv = new File(environmentPath);
boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
bdbEnvironment = new BDBEnvironment(Env.getServingEnv().isElectable(), metadataFailureRecovery);
HostInfo helperNode = Env.getServingEnv().getHelperNode();
String helperHostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort());
try {
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort);
} catch (Exception e) {
if (e instanceof DatabaseNotFoundException) {
LOG.error("It is not allowed to set metadata_failure_recovery"
+ "when meta dir or bdbje dir is empty, which may mean it is "
+ "the first time to start this node");
}
LOG.error("catch an exception when setup bdb environment. will exit.", e);
System.exit(-1);
}
}
// Open a new journal database or get last existing one as current journal
// database
List<Long> dbNames = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
dbNames = getDatabaseNames();
if (dbNames == null) {
LOG.error("fail to get dbNames while open bdbje journal. will exit");
System.exit(-1);
}
if (dbNames.isEmpty()) {
/*
* This is the very first time to open. Usually, we will open a new database
* named "1".
* But when we start cluster with an image file copied from other cluster,
* here we should open database with name image max journal id + 1.
* (default Catalog.getServingEnv().getReplayedJournalId() is 0)
*/
String dbName = Long.toString(Env.getServingEnv().getReplayedJournalId() + 1);
LOG.info("the very first time to open bdb, dbname is {}", dbName);
currentJournalDB = bdbEnvironment.openDatabase(dbName);
} else {
// get last database as current journal database
currentJournalDB = bdbEnvironment.openDatabase(dbNames.get(dbNames.size() - 1).toString());
}
// set next journal id
nextJournalId.set(getMaxJournalIdWithoutCheck() + 1);
break;
} catch (InsufficientLogException insufficientLogEx) {
reSetupBdbEnvironment(insufficientLogEx);
} catch (RollbackException rollbackEx) {
LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx);
bdbEnvironment.close();
bdbEnvironment.openReplicatedEnvironment(new File(environmentPath));
}
}
}
private void reSetupBdbEnvironment(InsufficientLogException insufficientLogEx) {
LOG.warn("catch insufficient log exception. will recover and try again.", insufficientLogEx);
// Copy the missing log files from a member of the replication group who owns
// the files
// ATTN: here we use `getServingEnv()`, because only serving catalog has
// helper nodes.
HostInfo helperNode = Env.getServingEnv().getHelperNode();
for (int i = 0; i < RETRY_TIME; i++) {
try {
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(insufficientLogEx, config);
break;
} catch (Exception e) {
LOG.warn("retry={}, reSetupBdbEnvironment exception:", i, e);
try {
Thread.sleep(5 * 1000);
LOG.warn("after sleep insufficientLogEx:", insufficientLogEx);
} catch (InterruptedException e1) {
LOG.warn("InterruptedException", e1);
}
}
}
bdbEnvironment.close();
bdbEnvironment.setup(new File(environmentPath), selfNodeName, selfNodeHostPort,
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()));
}
@Override
public long getJournalNum() {
return currentJournalDB.count();
}
@Override
public void deleteJournals(long deleteToJournalId) {
List<Long> dbNames = getDatabaseNames();
if (dbNames == null) {
LOG.info("delete database names is null.");
return;
}
String msg = "existing database names: ";
for (long name : dbNames) {
msg += name + " ";
}
msg += ", deleteToJournalId is " + deleteToJournalId;
LOG.info(msg);
for (int i = 1; i < dbNames.size(); i++) {
if (deleteToJournalId >= dbNames.get(i)) {
long name = dbNames.get(i - 1);
String stringName = Long.toString(name);
LOG.info("delete database name {}", stringName);
bdbEnvironment.removeDatabase(stringName);
} else {
LOG.info("database name {} is larger than deleteToJournalId {}, not delete",
dbNames.get(i), deleteToJournalId);
break;
}
}
}
@Override
public long getFinalizedJournalId() {
List<Long> dbNames = getDatabaseNames();
if (dbNames == null) {
LOG.error("database name is null.");
return 0;
}
String msg = "database names: ";
for (long name : dbNames) {
msg += name + " ";
}
LOG.info(msg);
if (dbNames.size() < 2) {
return 0;
}
return dbNames.get(dbNames.size() - 1) - 1;
}
@Override
public List<Long> getDatabaseNames() {
if (bdbEnvironment == null) {
return null;
}
// Open a new journal database or get last existing one as current journal
// database
List<Long> dbNames = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
dbNames = bdbEnvironment.getDatabaseNames();
break;
} catch (InsufficientLogException insufficientLogEx) {
/*
* If this is not a checkpoint thread, which means this maybe the FE startup
* thread,
* or a replay thread. We will reopen bdbEnvironment for these 2 cases to get
* valid log
* from helper nodes.
*
* The checkpoint thread will only run on Master FE. And Master FE should not
* encounter
* these exception. So if it happens, throw exception out.
*/
if (!Env.isCheckpointThread()) {
reSetupBdbEnvironment(insufficientLogEx);
} else {
throw insufficientLogEx;
}
} catch (RollbackException rollbackEx) {
if (!Env.isCheckpointThread()) {
// Because Doris FE can not rollback its edit log, so it should restart and replay the new master's
// edit log.
if (rollbackEx.getEarliestTransactionId() != 0) {
LOG.error("Catch rollback log exception and it may have replayed outdated "
+ "logs, so exec System.exit(-1).", rollbackEx);
System.exit(-1);
}
LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx);
bdbEnvironment.close();
bdbEnvironment.openReplicatedEnvironment(new File(environmentPath));
} else {
throw rollbackEx;
}
}
}
return dbNames;
}
public BDBEnvironment getBDBEnvironment() {
return this.bdbEnvironment;
}
public String getBDBStats() {
if (bdbEnvironment == null) {
return "";
}
ReplicatedEnvironment repEnv = bdbEnvironment.getReplicatedEnvironment();
if (repEnv == null) {
return "";
}
return repEnv.getRepStats(StatsConfig.DEFAULT).toString();
}
public String getNotReadyReason() {
if (bdbEnvironment == null) {
LOG.warn("replicatedEnvironment is null");
return "replicatedEnvironment is null";
}
return bdbEnvironment.getNotReadyReason();
}
@Override
public boolean exceedMaxJournalSize(short op, Writable writable) throws IOException {
JournalEntity entity = new JournalEntity();
entity.setOpCode(op);
entity.setData(writable);
DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
entity.write(buffer);
DatabaseEntry theData = new DatabaseEntry(buffer.getData());
if (LOG.isDebugEnabled()) {
LOG.debug("opCode = {}, journal size = {}", op, theData.getSize());
}
// 1GB
if (theData.getSize() > (1 << 30)) {
return true;
}
return false;
}
}