DBBinlog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.binlog;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DBBinlog {
private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
private long dbId;
// The size of all binlogs.
private long binlogSize;
// guard for allBinlogs && tableBinlogMap
private ReentrantReadWriteLock lock;
// all binlogs contain table binlogs && create table binlog etc ...
private TreeSet<TBinlog> allBinlogs;
// table binlogs
private Map<Long, TableBinlog> tableBinlogMap;
// Pair(commitSeq, timestamp), used for gc
// need UpsertRecord to add timestamps for gc
private List<Pair<Long, Long>> timestamps;
// The commit seq of the dropped partitions
private List<Pair<Long, Long>> droppedPartitions;
// The commit seq of the dropped tables
private List<Pair<Long, Long>> droppedTables;
// The commit seq of the dropped indexes
private List<Pair<Long, Long>> droppedIndexes;
private List<TBinlog> tableDummyBinlogs;
private BinlogConfigCache binlogConfigCache;
// The binlogs that are locked by the syncer.
// syncer id => commit seq
private Map<String, Long> lockedBinlogs;
public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
this.binlogConfigCache = binlogConfigCache;
this.binlogSize = 0;
// allBinlogs treeset order by commitSeq
allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
tableDummyBinlogs = Lists.newArrayList();
tableBinlogMap = Maps.newHashMap();
timestamps = Lists.newArrayList();
droppedPartitions = Lists.newArrayList();
droppedTables = Lists.newArrayList();
droppedIndexes = Lists.newArrayList();
lockedBinlogs = Maps.newHashMap();
TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
dummy = binlog;
} else {
dummy = BinlogUtils.newDummyBinlog(dbId, -1);
}
allBinlogs.add(dummy);
}
public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy,
List<TBinlog> tableDummies, boolean dbBinlogEnable) {
DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
long dbId = dbDummy.getDbId();
for (TBinlog tableDummy : tableDummies) {
long tableId = tableDummy.getBelong();
if (!dbBinlogEnable && !binlogConfigCache.isEnableTable(dbId, tableId)) {
continue;
}
dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(binlogConfigCache, tableDummy, dbId, tableId));
dbBinlog.tableDummyBinlogs.add(tableDummy);
}
return dbBinlog;
}
// not thread safety, do this without lock
public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
List<Long> tableIds = binlog.getTableIds();
if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
recordDroppedOrRecoveredResources(binlog);
if (tableIds == null) {
return;
}
for (long tableId : tableIds) {
TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable);
if (tableBinlog == null) {
continue;
}
tableBinlog.recoverBinlog(binlog);
}
}
// TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
// recovery
private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) {
tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId);
tableBinlogMap.put(tableId, tableBinlog);
tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
}
}
return tableBinlog;
}
// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable
// binlog
public void addBinlog(TBinlog binlog, Object raw) {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
List<Long> tableIds = binlog.getTableIds();
lock.writeLock().lock();
try {
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
if (tableIds == null) {
return;
}
// HACK: for metadata fix
// we should not add binlog for create table and drop table in table binlog
if (!binlog.isSetType()) {
return;
}
recordDroppedOrRecoveredResources(binlog, raw);
switch (binlog.getType()) {
case CREATE_TABLE:
return;
case DROP_TABLE:
return;
default:
break;
}
for (long tableId : tableIds) {
TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable);
if (tableBinlog != null) {
tableBinlog.addBinlog(binlog);
}
}
} finally {
lock.writeLock().unlock();
}
}
public long getDbId() {
return dbId;
}
public Pair<TStatus, List<TBinlog>> getBinlog(long tableId, long prevCommitSeq, long numAcquired) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
if (tableId >= 0) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
LOG.warn("table binlog not found. tableId: {}", tableId);
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
return Pair.of(status, null);
}
return tableBinlog.getBinlog(prevCommitSeq, numAcquired);
}
return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq, numAcquired);
} finally {
lock.readLock().unlock();
}
}
// Get the dropped partitions of the db.
public List<Pair<Long, Long>> getDroppedPartitions() {
lock.readLock().lock();
try {
return new ArrayList<>(droppedPartitions);
} finally {
lock.readLock().unlock();
}
}
// Get the dropped tables of the db.
public List<Pair<Long, Long>> getDroppedTables() {
lock.readLock().lock();
try {
return new ArrayList<>(droppedTables);
} finally {
lock.readLock().unlock();
}
}
// Get the dropped indexes of the db.
public List<Pair<Long, Long>> getDroppedIndexes() {
lock.readLock().lock();
try {
return new ArrayList<>(droppedIndexes);
} finally {
lock.readLock().unlock();
}
}
public Pair<TStatus, BinlogLagInfo> getBinlogLag(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
if (tableId >= 0) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
LOG.warn("table binlog not found. tableId: {}", tableId);
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
return Pair.of(status, null);
}
return tableBinlog.getBinlogLag(prevCommitSeq);
}
return BinlogUtils.getBinlogLag(allBinlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
}
public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId, long lockCommitSeq) {
TableBinlog tableBinlog = null;
lock.writeLock().lock();
try {
if (tableId < 0) {
return lockDbBinlog(jobUniqueId, lockCommitSeq);
}
tableBinlog = tableBinlogMap.get(tableId);
} finally {
lock.writeLock().unlock();
}
if (tableBinlog == null) {
LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId, tableId);
return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE), -1L);
}
return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq);
}
// Require: the write lock is held by the caller.
private Pair<TStatus, Long> lockDbBinlog(String jobUniqueId, long lockCommitSeq) {
TBinlog firstBinlog = allBinlogs.first();
TBinlog lastBinlog = allBinlogs.last();
if (lockCommitSeq < 0) {
// lock the latest binlog
lockCommitSeq = lastBinlog.getCommitSeq();
} else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
// lock the first binlog
lockCommitSeq = firstBinlog.getCommitSeq();
} else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}",
dbId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId);
return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L);
}
// keep idempotent
Long commitSeq = lockedBinlogs.get(jobUniqueId);
if (commitSeq != null && lockCommitSeq <= commitSeq) {
LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}", commitSeq, jobUniqueId, dbId);
return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
}
lockedBinlogs.put(jobUniqueId, lockCommitSeq);
return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
}
public BinlogTombstone gc() {
// check db
BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
if (dbBinlogConfig == null) {
LOG.error("db not found. dbId: {}", dbId);
return null;
} else if (!dbBinlogConfig.isEnable()) {
return dbBinlogDisableGc();
} else {
return dbBinlogEnableGc(dbBinlogConfig);
}
}
private BinlogTombstone collectTableTombstone(List<BinlogTombstone> tableTombstones, boolean isDbGc) {
if (tableTombstones.isEmpty()) {
return null;
}
BinlogTombstone dbTombstone = new BinlogTombstone(dbId, isDbGc);
for (BinlogTombstone tableTombstone : tableTombstones) {
// collect tableCommitSeq
dbTombstone.mergeTableTombstone(tableTombstone);
// collect tableVersionMap
Map<Long, UpsertRecord.TableRecord> tableVersionMap = tableTombstone.getTableVersionMap();
if (tableVersionMap.size() > 1) {
LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap);
}
dbTombstone.addTableRecord(tableVersionMap);
}
LOG.info("After GC, dbId: {}, dbExpiredBinlog: {}, tableExpiredBinlogs: {}",
dbId, dbTombstone.getCommitSeq(), dbTombstone.getTableCommitSeqMap());
return dbTombstone;
}
private BinlogTombstone dbBinlogDisableGc() {
List<BinlogTombstone> tombstones = Lists.newArrayList();
List<TableBinlog> tableBinlogs;
lock.readLock().lock();
try {
tableBinlogs = Lists.newArrayList(tableBinlogMap.values());
} finally {
lock.readLock().unlock();
}
for (TableBinlog tableBinlog : tableBinlogs) {
BinlogTombstone tombstone = tableBinlog.gc();
if (tombstone != null) {
tombstones.add(tombstone);
}
}
BinlogTombstone tombstone = collectTableTombstone(tombstones, false);
if (tombstone != null) {
removeExpiredMetaData(tombstone.getCommitSeq());
}
return tombstone;
}
// remove expired binlogs and dropped partitions, used in disable db binlog gc.
private void removeExpiredMetaData(long largestExpiredCommitSeq) {
lock.writeLock().lock();
try {
Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
boolean foundFirstUsingBinlog = false;
long lastCommitSeq = -1;
long removed = 0;
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
long commitSeq = binlog.getCommitSeq();
if (commitSeq <= largestExpiredCommitSeq) {
if (binlog.table_ref <= 0) {
binlogIter.remove();
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
++removed;
if (!foundFirstUsingBinlog) {
lastCommitSeq = commitSeq;
}
} else {
foundFirstUsingBinlog = true;
}
} else {
break;
}
}
gcDroppedResources(largestExpiredCommitSeq);
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}
LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed, dbId, allBinlogs.size());
} finally {
lock.writeLock().unlock();
}
}
// Get last expired binlog, and gc expired binlogs/timestamps/dropped
// partitions, used in enable db binlog gc.
private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
TBinlog lastExpiredBinlog = null;
Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (checker.isExpired(binlog)) {
binlogIter.remove();
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
lastExpiredBinlog = binlog;
} else {
break;
}
}
if (lastExpiredBinlog != null) {
final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
dummy.setCommitSeq(expiredCommitSeq);
dummy.setTimestamp(lastExpiredBinlog.getTimestamp());
// release expired timestamps by commit seq.
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext() && timeIter.next().first <= expiredCommitSeq) {
timeIter.remove();
}
lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq);
gcDroppedResources(expiredCommitSeq);
}
return lastExpiredBinlog;
}
private Optional<Long> getMinLockedCommitSeq() {
lock.readLock().lock();
try {
Optional<Long> minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo);
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
Optional<Long> tableMinLockedCommitSeq = tableBinlog.getMinLockedCommitSeq();
if (!tableMinLockedCommitSeq.isPresent()) {
continue;
}
if (minLockedCommitSeq.isPresent()) {
minLockedCommitSeq = Optional.of(Math.min(minLockedCommitSeq.get(), tableMinLockedCommitSeq.get()));
} else {
minLockedCommitSeq = tableMinLockedCommitSeq;
}
}
return minLockedCommitSeq;
} finally {
lock.readLock().unlock();
}
}
private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long maxBytes = dbBinlogConfig.getMaxBytes();
long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, maxHistoryNums: {}",
dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);
// step 1: get current tableBinlog info and expiredCommitSeq
Optional<Long> minLockedCommitSeq = getMinLockedCommitSeq();
TBinlog lastExpiredBinlog = null;
List<TableBinlog> tableBinlogs = Lists.newArrayList();
lock.writeLock().lock();
try {
long expiredCommitSeq = -1L;
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
Pair<Long, Long> pair = timeIter.next();
if (pair.second > expiredMs) {
break;
}
expiredCommitSeq = pair.first;
}
// Speed up gc by recycling binlogs that are not locked by syncer.
// To keep compatible with the old version, if no binlog is locked here, fallthrough to the
// previous behavior (keep the entire binlogs until it is expired).
if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) {
expiredCommitSeq = minLockedCommitSeq.get() - 1L;
}
final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator checker = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
//
// The expired conditions in order:
// 1. expired time
// 2. the max bytes
// 3. the max history num
return binlog.getCommitSeq() <= lastExpiredCommitSeq
|| maxBytes < binlogSize
|| maxHistoryNums < allBinlogs.size();
};
lastExpiredBinlog = getLastExpiredBinlog(checker);
tableBinlogs.addAll(tableBinlogMap.values());
} finally {
lock.writeLock().unlock();
}
if (lastExpiredBinlog == null) {
return null;
}
// step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db
// tombstone
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
for (TableBinlog tableBinlog : tableBinlogs) {
// step 2.1: gc tableBinlogļ¼and get table tombstone
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
if (tableTombstone != null) {
tableTombstones.add(tableTombstone);
}
}
return collectTableTombstone(tableTombstones, true);
}
public void replayGc(BinlogTombstone tombstone) {
if (tombstone.isDbBinlogTomstone()) {
dbBinlogEnableReplayGc(tombstone);
} else {
dbBinlogDisableReplayGc(tombstone);
removeExpiredMetaData(tombstone.getCommitSeq());
}
}
public void dbBinlogEnableReplayGc(BinlogTombstone tombstone) {
long largestExpiredCommitSeq = tombstone.getCommitSeq();
lock.writeLock().lock();
try {
BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq;
getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}
dbBinlogDisableReplayGc(tombstone);
}
public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
List<TableBinlog> tableBinlogs;
lock.readLock().lock();
try {
tableBinlogs = Lists.newArrayList(tableBinlogMap.values());
} finally {
lock.readLock().unlock();
}
if (tableBinlogs.isEmpty()) {
return;
}
Map<Long, Long> tableCommitSeqMap = tombstone.getTableCommitSeqMap();
for (TableBinlog tableBinlog : tableBinlogs) {
long tableId = tableBinlog.getTableId();
if (tableCommitSeqMap.containsKey(tableId)) {
tableBinlog.replayGc(tableCommitSeqMap.get(tableId));
}
}
}
private void gcDroppedResources(long commitSeq) {
Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
iter = droppedTables.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
iter = droppedIndexes.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
}
// not thread safety, do this without lock
public void getAllBinlogs(List<TBinlog> binlogs) {
binlogs.addAll(tableDummyBinlogs);
binlogs.addAll(allBinlogs);
}
public void removeTable(long tableId) {
lock.writeLock().lock();
try {
tableBinlogMap.remove(tableId);
} finally {
lock.writeLock().unlock();
}
}
public void getBinlogInfo(BaseProcResult result) {
BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
String dbName = "(dropped)";
String dropped = "true";
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
dbName = db.getFullName();
dropped = "false";
}
lock.readLock().lock();
try {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
if (dbBinlogEnable) {
List<String> info = new ArrayList<>();
info.add(dbName);
String type = "db";
info.add(type);
String id = String.valueOf(dbId);
info.add(id);
info.add(dropped);
String binlogLength = String.valueOf(allBinlogs.size());
info.add(binlogLength);
String binlogSize = String.valueOf(this.binlogSize);
info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
long timestamp = timestamps.get(0).second;
firstBinlogCommittedTime = String.valueOf(timestamp);
readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
}
info.add(firstBinlogCommittedTime);
info.add(readableFirstBinlogCommittedTime);
String lastBinlogCommittedTime = null;
String readableLastBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
long timestamp = timestamps.get(timestamps.size() - 1).second;
lastBinlogCommittedTime = String.valueOf(timestamp);
readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
}
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
String binlogMaxBytes = null;
String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
info.add(binlogMaxBytes);
info.add(binlogMaxHistoryNums);
result.addRow(info);
} else {
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
tableBinlog.getBinlogInfo(db, result);
}
}
} finally {
lock.readLock().unlock();
}
}
private void recordDroppedOrRecoveredResources(TBinlog binlog) {
recordDroppedOrRecoveredResources(binlog, null);
}
// A method to record the dropped tables, indexes, and partitions.
private void recordDroppedOrRecoveredResources(TBinlog binlog, Object raw) {
recordDroppedOrRecoveredResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
}
private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
if (raw == null) {
switch (binlogType) {
case DROP_PARTITION:
raw = DropPartitionInfo.fromJson(data);
break;
case DROP_TABLE:
raw = DropTableRecord.fromJson(data);
break;
case ALTER_JOB:
raw = AlterJobRecord.fromJson(data);
break;
case TRUNCATE_TABLE:
raw = TruncateTableRecord.fromJson(data);
break;
case REPLACE_TABLE:
raw = ReplaceTableOperationLog.fromJson(data);
break;
case DROP_ROLLUP:
raw = DropInfo.fromJson(data);
break;
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
case RECOVER_INFO:
raw = RecoverInfo.fromJson(data);
break;
case REPLACE_PARTITIONS:
raw = ReplacePartitionOperationLog.fromJson(data);
break;
default:
break;
}
if (raw == null) {
return;
}
}
recordDroppedOrRecoveredResources(binlogType, commitSeq, raw);
}
private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, Object raw) {
if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, commitSeq));
}
} else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
long tableId = ((DropTableRecord) raw).getTableId();
if (tableId > 0) {
droppedTables.add(Pair.of(tableId, commitSeq));
}
} else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, commitSeq));
}
}
}
} else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, commitSeq));
}
} else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof ReplaceTableOperationLog) {
ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw;
if (!record.isSwapTable()) {
droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
}
} else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof DropInfo) {
long indexId = ((DropInfo) raw).getIndexId();
if (indexId > 0) {
droppedIndexes.add(Pair.of(indexId, commitSeq));
}
} else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) {
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
if (log.hasBinlog()) {
recordDroppedOrRecoveredResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
}
} else if ((binlogType == TBinlogType.RECOVER_INFO) && (raw instanceof RecoverInfo)) {
RecoverInfo recoverInfo = (RecoverInfo) raw;
long partitionId = recoverInfo.getPartitionId();
long tableId = recoverInfo.getTableId();
if (partitionId > 0) {
droppedPartitions.removeIf(entry -> (entry.first == partitionId));
} else if (tableId > 0) {
droppedTables.removeIf(entry -> (entry.first == tableId));
}
} else if ((binlogType == TBinlogType.REPLACE_PARTITIONS) && (raw instanceof ReplacePartitionOperationLog)) {
ReplacePartitionOperationLog replacePartitionOperationLog = (ReplacePartitionOperationLog) raw;
for (Long partitionId : replacePartitionOperationLog.getReplacedPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, commitSeq));
}
}
}
}