BinlogManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.binlog;

import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.IndexChangeJob;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTableDefaultDistributionBucketNumOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TMemoryInputTransport;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class BinlogManager {
    private static final int BUFFER_SIZE = 16 * 1024;
    private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name")
            .add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime")
            .add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
            .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes")
            .add("BinlogMaxHistoryNums")
            .build();

    private static final Logger LOG = LogManager.getLogger(BinlogManager.class);

    private ReentrantReadWriteLock lock;
    private Map<Long, DBBinlog> dbBinlogMap;
    private BinlogConfigCache binlogConfigCache;

    public BinlogManager() {
        lock = new ReentrantReadWriteLock();
        dbBinlogMap = Maps.newHashMap();
        binlogConfigCache = new BinlogConfigCache();
    }

    private void afterAddBinlog(TBinlog binlog) {
        if (!binlog.isSetRemoveEnableCache()) {
            return;
        }
        if (!binlog.isRemoveEnableCache()) {
            return;
        }

        long dbId = binlog.getDbId();
        boolean onlyDb = true;
        if (binlog.isSetTableIds()) {
            for (long tableId : binlog.getTableIds()) {
                binlogConfigCache.remove(tableId);
                onlyDb = false;
            }
        }
        if (onlyDb) {
            binlogConfigCache.remove(dbId);
        }
    }

    private boolean isAsyncMvBinlog(TBinlog binlog) {
        if (!binlog.isSetTableIds()) {
            return false;
        }

        // Filter the binlogs belong to async materialized view, since we don't support async mv right now.
        for (long tableId : binlog.getTableIds()) {
            if (binlogConfigCache.isAsyncMvTable(binlog.getDbId(), tableId)) {
                LOG.debug("filter the async mv binlog, db {}, table {}, commit seq {}, ts {}, type {}, data {}",
                        binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(),
                        binlog.getType(), binlog.getData());
                return true;
            }
        }

        return false;
    }

    private boolean isTemporaryTable(TBinlog binlog) {
        if (!binlog.isSetTableIds()) {
            return false;
        }

        // Filter the binlogs belong to temporary table
        for (long tableId : binlog.getTableIds()) {
            if (binlogConfigCache.isTemporaryTable(binlog.getDbId(), tableId)) {
                LOG.debug("filter the temporary table binlog, db {}, table {}, commit seq {}, ts {}, type {}, data {}",
                        binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(),
                        binlog.getType(), binlog.getData());
                return true;
            }
        }

        return false;
    }

    private void addBinlog(TBinlog binlog, Object raw) {
        if (!Config.enable_feature_binlog) {
            return;
        }

        if (isAsyncMvBinlog(binlog) || isTemporaryTable(binlog)) {
            return;
        }

        LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, type {}, data {}",
                binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), binlog.getType(),
                binlog.getData());

        DBBinlog dbBinlog;
        lock.writeLock().lock();
        try {
            long dbId = binlog.getDbId();
            dbBinlog = dbBinlogMap.get(dbId);

            if (dbBinlog == null) {
                dbBinlog = new DBBinlog(binlogConfigCache, binlog);
                dbBinlogMap.put(dbId, dbBinlog);
            }
        } finally {
            lock.writeLock().unlock();
        }

        dbBinlog.addBinlog(binlog, raw);
    }

    private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
            String data, boolean removeEnableCache, Object raw) {
        if (!Config.enable_feature_binlog) {
            return;
        }

        TBinlog binlog = new TBinlog();
        // set commitSeq, timestamp, type, dbId, data
        binlog.setCommitSeq(commitSeq);
        binlog.setTimestamp(timestamp);
        binlog.setType(type);
        binlog.setDbId(dbId);
        binlog.setData(data);
        if (tableIds != null && !tableIds.isEmpty()) {
            binlog.setTableIds(tableIds);
        }
        binlog.setTableRef(0);
        binlog.setRemoveEnableCache(removeEnableCache);

        // Check if all db or table binlog is disable, return
        boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
        boolean anyEnable = dbBinlogEnable;
        if (tableIds != null) {
            for (long tableId : tableIds) {
                boolean tableBinlogEnable = binlogConfigCache.isEnableTable(dbId, tableId);
                anyEnable = anyEnable || tableBinlogEnable;
                if (anyEnable) {
                    break;
                }
            }
        }

        if (anyEnable) {
            addBinlog(binlog, raw);
        }

        afterAddBinlog(binlog);
    }

    public void addUpsertRecord(UpsertRecord upsertRecord) {
        long dbId = upsertRecord.getDbId();
        List<Long> tableIds = upsertRecord.getAllReleatedTableIds();
        long commitSeq = upsertRecord.getCommitSeq();
        long timestamp = upsertRecord.getTimestamp();
        TBinlogType type = TBinlogType.UPSERT;
        String data = upsertRecord.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, upsertRecord);
    }

    public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
        long dbId = addPartitionRecord.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(addPartitionRecord.getTableId());
        long commitSeq = addPartitionRecord.getCommitSeq();
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.ADD_PARTITION;
        String data = addPartitionRecord.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, addPartitionRecord);
    }

    public void addCreateTableRecord(CreateTableRecord createTableRecord) {
        long dbId = createTableRecord.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(createTableRecord.getTableId());
        long commitSeq = createTableRecord.getCommitSeq();
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.CREATE_TABLE;
        String data = createTableRecord.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, createTableRecord);
    }

    public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
        long dbId = dropPartitionInfo.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(dropPartitionInfo.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.DROP_PARTITION;
        String data = dropPartitionInfo.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, dropPartitionInfo);
    }

    public void addDropTableRecord(DropTableRecord record) {
        long dbId = record.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(record.getTableId());
        long commitSeq = record.getCommitSeq();
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.DROP_TABLE;
        String data = record.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
    }

    public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
        long dbId = alterJob.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(alterJob.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.ALTER_JOB;
        AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob);
        String data = alterJobRecord.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJobRecord);
    }

    public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();

        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true, info);
    }

    public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true, info);
    }

    // add Barrier log
    public void addBarrierLog(BarrierLog barrierLog, long commitSeq) {
        if (barrierLog == null) {
            return;
        }

        long dbId = barrierLog.getDbId();
        long tableId = barrierLog.getTableId();
        if (dbId == 0 || tableId == 0) {
            return;
        }

        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(tableId);
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.BARRIER;
        String data = barrierLog.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, barrierLog);
    }

    // add Modify partitions
    public void addModifyPartitions(BatchModifyPartitionsInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_PARTITIONS;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    // add Replace partition
    public void addReplacePartitions(ReplacePartitionOperationLog info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTblId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.REPLACE_PARTITIONS;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    // add Truncate Table
    public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTblId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.TRUNCATE_TABLE;
        TruncateTableRecord record = new TruncateTableRecord(info);
        String data = record.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
    }

    public void addTableRename(TableInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.RENAME_TABLE;
        String data = info.toJson();
        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addRollupRename(TableInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.RENAME_ROLLUP;
        String data = info.toJson();
        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addPartitionRename(TableInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.RENAME_PARTITION;
        String data = info.toJson();
        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addModifyComment(ModifyCommentOperationLog info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTblId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_COMMENT;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addColumnRename(TableRenameColumnInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.RENAME_COLUMN;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    // add Modify view
    public void addModifyViewDef(AlterViewInfo alterViewInfo, long commitSeq) {
        long dbId = alterViewInfo.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(alterViewInfo.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_VIEW_DEF;
        String data = alterViewInfo.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterViewInfo);
    }

    public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) {
        if (StringUtils.isEmpty(info.getOrigTblName()) || StringUtils.isEmpty(info.getNewTblName())) {
            LOG.warn("skip replace table binlog, because origTblName or newTblName is empty. info: {}", info);
            return;
        }

        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getOrigTblId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.REPLACE_TABLE;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addModifyDistributionNum(ModifyTableDefaultDistributionBucketNumOperationLog info, long commitSeq) {
        if (info.getBucketNum() <= 0 || info.getType() == null) {
            LOG.warn("skip modify distribution num binlog, because bucket num is invalid. info: {}", info);
            return;
        }

        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_DISTRIBUTION_BUCKET_NUM;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addModifyDistributionType(TableInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_DISTRIBUTION_TYPE;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, long commitSeq) {
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) {
        long dbId = indexChangeJob.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(indexChangeJob.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.INDEX_CHANGE_JOB;
        String data = indexChangeJob.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob);
    }

    public void addDropRollup(DropInfo info, long commitSeq) {
        if (StringUtils.isEmpty(info.getIndexName())) {
            LOG.warn("skip drop rollup binlog, because indexName is empty. info: {}", info);
            return;
        }

        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.DROP_ROLLUP;
        String data = info.toJson();

        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    private boolean supportedRecoverInfo(RecoverInfo info) {
        //table name and partitionName added together.
        // recover table case, tablename must exist in newer version
        // recover partition case also table name must exist.
        // so checking only table name here.
        if (StringUtils.isEmpty(info.getTableName())) {
            LOG.warn("skip recover info binlog, because tableName is empty. info: {}", info);
            return false;
        }
        return true;
    }

    public void addRecoverTableRecord(RecoverInfo info, long commitSeq) {
        if (supportedRecoverInfo(info) == false) {
            return;
        }
        long dbId = info.getDbId();
        List<Long> tableIds = Lists.newArrayList();
        tableIds.add(info.getTableId());
        long timestamp = System.currentTimeMillis();
        TBinlogType type = TBinlogType.RECOVER_INFO;
        String data = info.toJson();
        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
    }

    // get binlog by dbId, return first binlog.version > version
    public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
        Pair<TStatus, List<TBinlog>> result = getBinlog(dbId, tableId, prevCommitSeq, 1);
        if (result.second != null && result.second.size() > 0) {
            return Pair.of(result.first, result.second.get(0));
        }
        return Pair.of(result.first, null);
    }

    // get binlogs by dbId, return the first N binlogs, which first binlog.version > prevCommitSeq
    public Pair<TStatus, List<TBinlog>> getBinlog(long dbId, long tableId, long prevCommitSeq, long numAcquired) {
        TStatus status = new TStatus(TStatusCode.OK);
        lock.readLock().lock();
        try {
            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
            if (dbBinlog == null) {
                status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_DB);
                LOG.warn("dbBinlog not found. dbId: {}", dbId);
                return Pair.of(status, null);
            }

            return dbBinlog.getBinlog(tableId, prevCommitSeq, numAcquired);
        } finally {
            lock.readLock().unlock();
        }
    }

    // get binlog by dbId, return first binlog.version > version
    public Pair<TStatus, BinlogLagInfo> getBinlogLag(long dbId, long tableId, long prevCommitSeq) {
        TStatus status = new TStatus(TStatusCode.OK);
        lock.readLock().lock();
        try {
            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
            if (dbBinlog == null) {
                status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_DB);
                LOG.warn("dbBinlog not found. dbId: {}", dbId);
                return Pair.of(status, null);
            }
            return dbBinlog.getBinlogLag(tableId, prevCommitSeq);
        } finally {
            lock.readLock().unlock();
        }
    }

    public Pair<TStatus, Long> lockBinlog(long dbId, long tableId,
            String jobUniqueId, long lockCommitSeq) {
        LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {}, lockCommitSeq: {}",
                dbId, tableId, jobUniqueId, lockCommitSeq);

        DBBinlog dbBinlog = null;
        lock.readLock().lock();
        try {
            dbBinlog = dbBinlogMap.get(dbId);
        } finally {
            lock.readLock().unlock();
        }

        if (dbBinlog == null) {
            LOG.warn("db binlog not found. dbId: {}", dbId);
            return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L);
        }
        return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq);
    }

    // get the dropped partitions of the db.
    public List<Pair<Long, Long>> getDroppedPartitions(long dbId) {
        lock.readLock().lock();
        try {
            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
            if (dbBinlog == null) {
                return Lists.newArrayList();
            }
            return dbBinlog.getDroppedPartitions();
        } finally {
            lock.readLock().unlock();
        }
    }

    // get the dropped tables of the db.
    public List<Pair<Long, Long>> getDroppedTables(long dbId) {
        lock.readLock().lock();
        try {
            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
            if (dbBinlog == null) {
                return Lists.newArrayList();
            }
            return dbBinlog.getDroppedTables();
        } finally {
            lock.readLock().unlock();
        }
    }

    // get the dropped indexes of the db.
    public List<Pair<Long, Long>> getDroppedIndexes(long dbId) {
        lock.readLock().lock();
        try {
            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
            if (dbBinlog == null) {
                return Lists.newArrayList();
            }
            return dbBinlog.getDroppedIndexes();
        } finally {
            lock.readLock().unlock();
        }
    }

    public List<BinlogTombstone> gc() {
        LOG.info("begin gc binlog");

        lock.writeLock().lock();
        Map<Long, DBBinlog> gcDbBinlogMap;
        try {
            gcDbBinlogMap = Maps.newHashMap(dbBinlogMap);
        } finally {
            lock.writeLock().unlock();
        }

        if (gcDbBinlogMap.isEmpty()) {
            LOG.info("gc binlog, dbBinlogMap is null");
            return null;
        }

        List<BinlogTombstone> tombstones = Lists.newArrayList();
        for (DBBinlog dbBinlog : gcDbBinlogMap.values()) {
            BinlogTombstone dbTombstones = dbBinlog.gc();
            if (dbTombstones != null) {
                tombstones.add(dbTombstones);
            }
        }

        return tombstones;
    }

    public void replayGc(BinlogGcInfo binlogGcInfo) {
        lock.writeLock().lock();
        Map<Long, DBBinlog> gcDbBinlogMap;
        try {
            gcDbBinlogMap = Maps.newHashMap(dbBinlogMap);
        } finally {
            lock.writeLock().unlock();
        }

        if (gcDbBinlogMap.isEmpty()) {
            LOG.info("replay gc binlog, dbBinlogMap is null");
            return;
        }

        for (BinlogTombstone tombstone : binlogGcInfo.getTombstones()) {
            long dbId = tombstone.getDbId();
            DBBinlog dbBinlog = gcDbBinlogMap.get(dbId);
            if (dbBinlog == null) {
                LOG.warn("dbBinlog not found. dbId: {}", dbId);
                continue;
            }
            dbBinlog.replayGc(tombstone);
        }
    }

    public void removeDB(long dbId) {
        lock.writeLock().lock();
        try {
            dbBinlogMap.remove(dbId);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void removeTable(long dbId, long tableId) {
        lock.writeLock().lock();
        try {
            DBBinlog dbBinlog = dbBinlogMap.get(dbId);
            if (dbBinlog != null) {
                dbBinlog.removeTable(tableId);
            }
        } finally {
            lock.writeLock().unlock();
        }
    }

    private static void writeTBinlogToStream(DataOutputStream dos, TBinlog binlog) throws TException, IOException {
        TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE);
        TBinaryProtocol protocol = new TBinaryProtocol(buffer);
        binlog.write(protocol);
        byte[] data = buffer.getArray();
        dos.writeInt(data.length);
        dos.write(data);
    }

    // not thread safety, do this without lock
    public long write(DataOutputStream dos, long checksum) throws IOException {
        if (!Config.enable_feature_binlog) {
            return checksum;
        }

        List<TBinlog> binlogs = Lists.newArrayList();
        // Step 1: get all binlogs
        for (DBBinlog dbBinlog : dbBinlogMap.values()) {
            dbBinlog.getAllBinlogs(binlogs);
        }

        // Step 2: write binlogs length
        dos.writeInt(binlogs.size());
        LOG.info("write binlogs length: {}", binlogs.size());

        // Step 3: write all binlogs to dos
        // binlog is a thrift type TBinlog
        for (TBinlog binlog : binlogs) {
            try {
                writeTBinlogToStream(dos, binlog);
            } catch (TException e) {
                throw new IOException("failed to write binlog to TMemoryBuffer");
            }
        }

        return checksum;
    }

    public TBinlog readTBinlogFromStream(DataInputStream dis) throws TException, IOException {
        // We assume that the first int is the length of the serialized data.
        int length = dis.readInt();
        byte[] data = new byte[length];
        dis.readFully(data);
        Boolean isLargeBinlog = length > 8 * 1024 * 1024;
        if (isLargeBinlog) {
            LOG.info("a large binlog length {}", length);
        }

        TMemoryInputTransport transport = new TMemoryInputTransport();
        transport.getConfiguration().setMaxMessageSize(Config.max_binlog_messsage_size);
        transport.reset(data);

        TBinaryProtocol protocol = new TBinaryProtocol(transport);
        TBinlog binlog = new TBinlog();
        binlog.read(protocol);

        if (isLargeBinlog) {
            LOG.info("a large binlog length {} type {}", length, binlog.type);
        }
        return binlog;
    }

    // db TBinlogs in file struct:
    // (tableDummy)TBinlog.belong == tableId, (dbDummy)TBinlog.belong == -1
    // +---------------------------+------------------+-----------------------------------+
    // | (tableDummy)TBinlog | ... | (dbDummy)TBinlog | TBinlog | TBinlog | TBinlog | ... |
    // +---------------------------+------------------+-----------------------------------+
    // |        Unnecessary        |     Necessary    |             Unnecessary           |
    // +---------------------------+------------------+-----------------------------------+
    public long read(DataInputStream dis, long checksum) throws IOException {
        // Step 1: read binlogs length
        int size = dis.readInt();
        LOG.info("read binlogs length: {}", size);

        // Step 2: read all binlogs from dis
        long currentDbId = -1;
        boolean currentDbBinlogEnable = false;
        List<TBinlog> tableDummies = Lists.newArrayList();
        try {
            for (int i = 0; i < size; i++) {
                // Step 2.1: read a binlog
                TBinlog binlog = readTBinlogFromStream(dis);

                if (!Config.enable_feature_binlog) {
                    continue;
                }

                long dbId = binlog.getDbId();
                if (binlog.getType().getValue() >= TBinlogType.MIN_UNKNOWN.getValue()) {
                    LOG.warn("skip unknown binlog, type: {}, db: {}", binlog.getType().getValue(), dbId);
                    continue;
                }

                // Step 2.2: check if there is in next db Binlogs region
                if (dbId != currentDbId) {
                    // if there is in next db Binlogs region, check and update metadata
                    Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
                    if (db == null) {
                        LOG.warn("db not found. dbId: {}", dbId);
                        continue;
                    }
                    currentDbId = dbId;
                    currentDbBinlogEnable = db.getBinlogConfig().isEnable();
                    tableDummies = Lists.newArrayList();
                }

                // step 2.3: recover binlog
                if (binlog.getType() == TBinlogType.DUMMY) {
                    // collect tableDummyBinlogs and dbDummyBinlog to recover DBBinlog and TableBinlog
                    if (binlog.getBelong() == -1) {
                        DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlogConfigCache, binlog, tableDummies,
                                currentDbBinlogEnable);
                        dbBinlogMap.put(dbId, dbBinlog);
                    } else {
                        tableDummies.add(binlog);
                    }
                } else {
                    // recover common binlogs
                    DBBinlog dbBinlog = dbBinlogMap.get(dbId);
                    if (dbBinlog == null) {
                        LOG.warn("dbBinlog recover fail! binlog {} is before dummy. dbId: {}", binlog, dbId);
                        continue;
                    }
                    binlog.setTableRef(0);
                    dbBinlog.recoverBinlog(binlog, currentDbBinlogEnable);
                }
            }
        } catch (TException e) {
            throw new IOException("failed to read binlog from TMemoryBuffer", e);
        }

        return checksum;
    }

    public ProcResult getBinlogInfo() {
        BaseProcResult result = new BaseProcResult();
        result.setNames(TITLE_NAMES);

        lock.readLock().lock();
        try {
            for (DBBinlog dbBinlog : dbBinlogMap.values()) {
                dbBinlog.getBinlogInfo(result);
            }
        } finally {
            lock.readLock().unlock();
        }

        return result;
    }

    // remove DB
    // remove Table
}