BDBJournalCursor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.List;
public class BDBJournalCursor implements JournalCursor {
private static final Logger LOG = LogManager.getLogger(BDBJournalCursor.class);
private long toKey;
private long currentKey;
private BDBEnvironment environment;
private List<Long> dbNames;
private Database database;
private int nextDbPositionIndex;
private final int maxTryTime = 3;
public static BDBJournalCursor getJournalCursor(BDBEnvironment env, long fromKey, long toKey) {
if (toKey < fromKey || fromKey < 0) {
LOG.warn("Invalid key range! fromKey:{} toKey:{}", fromKey, toKey);
return null;
}
BDBJournalCursor cursor = null;
try {
cursor = new BDBJournalCursor(env, fromKey, toKey);
} catch (Exception e) {
LOG.error("new BDBJournalCursor error.", e);
}
return cursor;
}
private BDBJournalCursor(BDBEnvironment env, long fromKey, long toKey) throws Exception {
this.environment = env;
this.toKey = toKey;
this.currentKey = fromKey;
this.dbNames = env.getDatabaseNames();
if (dbNames == null) {
throw new NullPointerException("dbNames is null.");
}
this.nextDbPositionIndex = 0;
// find the db which may contain the fromKey
String dbName = null;
for (long db : dbNames) {
if (fromKey >= db) {
dbName = Long.toString(db);
nextDbPositionIndex++;
} else {
break;
}
}
if (dbName == null) {
LOG.error("Can not find the key:{}, fail to get journal cursor. will exit.", fromKey);
System.exit(-1);
}
this.database = env.openDatabase(dbName);
}
@Override
public Pair<Long, JournalEntity> next() {
if (currentKey > toKey) {
return null;
}
if (Env.getCurrentEnv().getForceSkipJournalIds().contains(String.valueOf(currentKey))) {
return Pair.of(currentKey++, null);
}
Long key = currentKey;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> myBinding = TupleBinding.getPrimitiveBinding(Long.class);
myBinding.objectToEntry(key, theKey);
DatabaseEntry theData = new DatabaseEntry();
// if current db does not contain any more data, then we go to search the next db
try {
// null means perform the operation without transaction protection.
// READ_COMMITTED guarantees no dirty read.
int tryTimes = 0;
while (true) {
OperationStatus operationStatus = database.get(null, theKey, theData, LockMode.READ_COMMITTED);
if (operationStatus == OperationStatus.SUCCESS) {
// Recreate the data String.
byte[] retData = theData.getData();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(retData));
JournalEntity entity = new JournalEntity();
try {
entity.readFields(in);
entity.setDataSize(retData.length);
} catch (Exception e) {
LOG.error("fail to read journal entity key={}, will exit", currentKey, e);
System.exit(-1);
}
currentKey++;
return Pair.of(key, entity);
} else if (nextDbPositionIndex < dbNames.size() && currentKey == dbNames.get(nextDbPositionIndex)) {
database = environment.openDatabase(dbNames.get(nextDbPositionIndex).toString());
nextDbPositionIndex++;
tryTimes = 0;
} else if (tryTimes < maxTryTime) {
tryTimes++;
LOG.warn("fail to get journal {}, will try again. status: {}", currentKey, operationStatus);
Thread.sleep(3000);
} else if (operationStatus == OperationStatus.NOTFOUND) {
// In the case:
// On non-master FE, the replayer will first get the max journal id,
// than try to replay logs from current replayed id to the max journal id. But when
// master FE try to write a log to bdbje, but crashed before this log is committed,
// the non-master FE may still get this incomplete log's id as max journal id,
// and try to replay it. We will first get LockTimeoutException (because the transaction
// is hanging and waiting to be aborted after timeout). and after this log abort,
// we will get NOTFOUND.
// So we simply throw a exception and let the replayer get the max id again.
throw new Exception(
"Failed to find key " + currentKey + " in database " + database.getDatabaseName());
} else {
LOG.error("fail to get journal {}, status: {}, will exit", currentKey, operationStatus);
System.exit(-1);
}
}
} catch (Exception e) {
LOG.warn("Catch an exception when get next JournalEntity. key:{}", currentKey, e);
return null;
}
}
@Override
public void close() {
}
}