EditLogFileOutputStream.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* An implementation of the abstract class {@link EditLogOutputStream},
* which stores edits in a local file.
*/
public class EditLogFileOutputStream extends EditLogOutputStream {
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
private DataOutputBuffer bufCurrent; // current buffer for writing
private DataOutputBuffer bufReady; // buffer ready for flushing
static ByteBuffer fill = ByteBuffer.allocateDirect(512); // pre-allocation
private static int sizeFlushBuffer = 512 * 1024;
public EditLogFileOutputStream(File name) throws IOException {
super();
file = name;
bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
bufReady = new DataOutputBuffer(sizeFlushBuffer);
RandomAccessFile rp = new RandomAccessFile(name, "rw");
fp = new FileOutputStream(rp.getFD()); // open for append
fc = rp.getChannel();
fc.position(fc.size());
}
String getName() {
return file.getPath();
}
public void write(int b) throws IOException {
bufCurrent.write(b);
}
public void write(short op, Writable writable) throws IOException {
bufCurrent.writeShort(op);
writable.write(bufCurrent);
}
public void write(short op, byte[] data) throws IOException {
bufCurrent.writeShort(op);
bufCurrent.write(data);
}
// Create empty edits logs file.
void create() throws IOException {
fc.truncate(0);
fc.position(0);
setReadyToFlush();
flush();
}
public void close() throws IOException {
// close should have been called after all pending transactions
// have been flushed & synced.
int bufSize = bufCurrent.size();
if (bufSize != 0) {
throw new IOException("EditStream has " + bufSize
+ " bytes still to be flushed and cannot "
+ "be closed.");
}
bufCurrent.close();
bufReady.close();
// remove the last INVALID marker from transaction log.
fc.truncate(fc.position());
fp.close();
bufCurrent = null;
bufReady = null;
}
/**
* All data that has been written to the stream so far will be flushed.
* New data can be still written to the stream while flushing is performed.
*/
public void setReadyToFlush() throws IOException {
assert bufReady.size() == 0 : "previous data is not flushed yet";
write(OperationType.OP_LOCAL_EOF); // insert end-of-file marker
DataOutputBuffer tmp = bufReady;
bufReady = bufCurrent;
bufCurrent = tmp;
}
/**
* Flush ready buffer to persistent store. currentBuffer is not flushed
* as it accumulates new log records while readyBuffer will be flushed and synced.
*/
protected void flushAndSync() throws IOException {
preallocate(); // preallocate file if necessary
bufReady.writeTo(fp); // write data to file
bufReady.reset(); // erase all data in the buffer
fc.force(false); // metadata updates not needed because of preallocation
fc.position(fc.position() - 1); // skip back the end-of-file marker
}
// Return the size of the current edit log including buffered data.
long length() throws IOException {
return fc.size() + bufReady.size() + bufCurrent.size();
}
// Allocate a big chunk of data
private void preallocate() throws IOException {
long position = fc.position();
if (position + 4096 >= fc.size()) {
// use pre allocate , then file size will not be changed very time,
// so fsync needn't update file size every time. This is an optimization.
long newSize = position + 1024 * 1024; // 1MB
fill.position(0);
fc.write(fill, newSize);
}
}
File getFile() {
return file;
}
}