MetaWriter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.persist.meta;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Reference;
import org.apache.doris.common.io.CountingDataOutputStream;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;

/**
 * Image Format:
 * |- Image --------------------------------------|
 * | - Magic String (4 bytes)                     |
 * | - Header Length (4 bytes)                    |
 * | |- Header -----------------------------|     |
 * | | |- Json Header ---------------|      |     |
 * | | | - version                   |      |     |
 * | | | - other key/value(undecided)|      |     |
 * | | |-----------------------------|      |     |
 * | |--------------------------------------|     |
 * |                                              |
 * | |- Image Body -------------------------|     |
 * | | Object a                             |     |
 * | | Object b                             |     |
 * | | ...                                  |     |
 * | |--------------------------------------|     |
 * |                                              |
 * | |- Footer -----------------------------|     |
 * | | | - Checksum (8 bytes)               |     |
 * | | |- object index --------------|      |     |
 * | | | - index a                   |      |     |
 * | | | - index b                   |      |     |
 * | | | ...                         |      |     |
 * | | |-----------------------------|      |     |
 * | | - other value(undecided)             |     |
 * | |--------------------------------------|     |
 * | - Footer Length (8 bytes)                    |
 * | - Magic String (4 bytes)                     |
 * |----------------------------------------------|
 */

public class MetaWriter {
    private static final Logger LOG = LogManager.getLogger(MetaWriter.class);

    public static MetaWriter writer = new MetaWriter();

    private interface Delegate {
        long doWork(String name, WriteMethod method) throws IOException;
    }

    private interface WriteMethod {
        long write() throws IOException;
    }

    private Delegate delegate;

    public void setDelegate(CountingDataOutputStream dos, List<MetaIndex> indices) {
        this.delegate = (name, method) -> {
            indices.add(new MetaIndex(name, dos.getCount()));
            return method.write();
        };
    }

    public long doWork(String name, WriteMethod method) throws IOException {
        if (delegate == null) {
            return method.write();
        }
        return delegate.doWork(name, method);
    }

    public static void write(File imageFile, Env env) throws IOException {
        // save image does not need any lock. because only checkpoint thread will call this method.
        LOG.info("start to save image to {}. is ckpt: {}",
                imageFile.getAbsolutePath(), Env.isCheckpointThread());
        final Reference<Long> checksum = new Reference<>(0L);
        long saveImageStartTime = System.currentTimeMillis();
        // MetaHeader should use output stream in the future.
        long startPosition = MetaHeader.write(imageFile);
        List<MetaIndex> metaIndices = Lists.newArrayList();
        FileOutputStream imageFileOut = new FileOutputStream(imageFile, true);
        try (CountingDataOutputStream dos = new CountingDataOutputStream(new BufferedOutputStream(imageFileOut),
                startPosition)) {
            writer.setDelegate(dos, metaIndices);
            long replayedJournalId = env.getReplayedJournalId();
            // 1. write header first
            checksum.setRef(
                    writer.doWork("header", () -> env.saveHeader(dos, replayedJournalId, checksum.getRef())));
            // 2. write other modules
            for (MetaPersistMethod m : PersistMetaModules.MODULES_IN_ORDER) {
                checksum.setRef(writer.doWork(m.name, () -> {
                    try {
                        return (long) m.writeMethod.invoke(env, dos, checksum.getRef());
                    } catch (IllegalAccessException | InvocationTargetException e) {
                        LOG.warn("failed to write meta module: {}", m.name, e);
                        throw new RuntimeException(e);
                    }
                }));
            }
            // 3. force sync to disk
            imageFileOut.getChannel().force(true);
        }
        MetaFooter.write(imageFile, metaIndices, checksum.getRef());

        long saveImageEndTime = System.currentTimeMillis();
        LOG.info("finished save image {} in {} ms. checksum is {}, size is {}", imageFile.getAbsolutePath(),
                (saveImageEndTime - saveImageStartTime), checksum.getRef(), imageFile.length());
    }

}