LocalJournal.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.journal.local;
import org.apache.doris.common.io.Writable;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.persist.EditLogFileOutputStream;
import org.apache.doris.persist.EditLogOutputStream;
import org.apache.doris.persist.Storage;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class LocalJournal implements Journal {
private static final Logger LOG = LogManager.getLogger(LocalJournal.class);
private EditLogOutputStream outputStream = null;
private AtomicLong journalId = new AtomicLong(1);
private String imageDir;
private File currentEditFile;
public LocalJournal(String imageDir) {
this.imageDir = imageDir;
}
@Override
public void open() {
if (outputStream == null) {
try {
Storage storage = new Storage(imageDir);
this.journalId.set(getCurrentJournalId(storage.getEditsFileSequenceNumbers()));
long id = journalId.get();
if (storage.getEditsSeq() == 0) {
// there is no edits file, create first one
Preconditions.checkState(id == 1, id);
currentEditFile = new File(imageDir, "edits.1");
currentEditFile.createNewFile();
outputStream = new EditLogFileOutputStream(currentEditFile);
} else if (id == storage.getEditsSeq()) {
// there exist edits files, point to the latest one and set position to the end of file.
this.currentEditFile = storage.getEditsFile(id);
this.outputStream = new EditLogFileOutputStream(currentEditFile);
} else {
// create next edits file
currentEditFile = new File(imageDir, "edits." + (id + 1));
currentEditFile.createNewFile();
outputStream = new EditLogFileOutputStream(currentEditFile);
}
} catch (IOException e) {
LOG.error(e);
}
}
}
@Override
public synchronized void rollJournal() {
Storage storage;
try {
storage = new Storage(imageDir);
if (journalId.get() == storage.getEditsSeq()) {
LOG.warn("Does not need to roll! journalId: {}, editsSeq: {}", journalId.get(), storage.getEditsSeq());
return;
}
if (outputStream != null) {
outputStream.flush();
outputStream.close();
}
currentEditFile = new File(imageDir, "edits." + journalId.get());
currentEditFile.createNewFile();
outputStream = new EditLogFileOutputStream(currentEditFile);
} catch (IOException e) {
LOG.error(e);
}
}
@Override
public long getMaxJournalId() {
return 0;
}
@Override
public long getMinJournalId() {
return 0;
}
@Override
public long getJournalNum() {
return 0;
}
@Override
public void close() {
if (outputStream == null) {
return;
}
try {
outputStream.setReadyToFlush();
outputStream.flush();
outputStream.close();
} catch (IOException e) {
LOG.error(e);
}
}
@Override
public JournalEntity read(long journalId) {
return null;
}
@Override
public JournalCursor read(long fromKey, long toKey) {
JournalCursor cursor = LocalJournalCursor.getJournalCursor(imageDir, fromKey, toKey);
return cursor;
}
@Override
public synchronized long write(JournalBatch batch) throws IOException {
List<JournalBatch.Entity> entities = batch.getJournalEntities();
for (JournalBatch.Entity entity : entities) {
outputStream.write(entity.getOpCode(), entity.getBinaryData());
}
outputStream.setReadyToFlush();
outputStream.flush();
return journalId.getAndAdd(entities.size());
}
@Override
public synchronized long write(short op, Writable writable) throws IOException {
outputStream.write(op, writable);
outputStream.setReadyToFlush();
outputStream.flush();
return journalId.incrementAndGet();
}
@Override
public void deleteJournals(long deleteJournalToId) {
try {
Storage storage = new Storage(imageDir);
List<Long> numbers = storage.getEditsFileSequenceNumbers();
for (long number : numbers) {
if (number < deleteJournalToId) {
File file = new File(imageDir, "edits." + number);
if (file.exists()) {
file.delete();
}
}
}
} catch (IOException e) {
LOG.error(e);
}
}
@Override
public long getFinalizedJournalId() {
try {
Storage storage = new Storage(imageDir);
List<Long> numbers = storage.getEditsFileSequenceNumbers();
int size = numbers.size();
if (size > 1) {
return numbers.get(size - 1) - 1;
}
} catch (IOException e) {
LOG.error(e);
}
return 0;
}
private long getCurrentJournalId(List<Long> editFileNames) {
if (editFileNames.size() == 0) {
return 1;
}
long ret = editFileNames.get(editFileNames.size() - 1);
JournalCursor cursor = read(ret, -1);
while (cursor.next() != null) {
ret++;
}
return ret;
}
@Override
public List<Long> getDatabaseNames() {
throw new RuntimeException("Not Support");
}
@Override
public boolean exceedMaxJournalSize(short op, Writable writable) throws IOException {
return false;
}
}