TableBinlog.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.binlog;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TableBinlog {
private static final Logger LOG = LogManager.getLogger(TableBinlog.class);
private long dbId;
private long tableId;
private long binlogSize;
private ReentrantReadWriteLock lock;
private TreeSet<TBinlog> binlogs;
// Pair(commitSeq, timestamp), used for gc
// need UpsertRecord to add timestamps for gc
private List<Pair<Long, Long>> timestamps;
private BinlogConfigCache binlogConfigCache;
// The binlogs that are locked by the syncer.
// syncer id => commit seq
private Map<String, Long> lockedBinlogs;
public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) {
this.dbId = dbId;
this.tableId = tableId;
this.binlogSize = 0;
lock = new ReentrantReadWriteLock();
binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
timestamps = Lists.newArrayList();
lockedBinlogs = Maps.newHashMap();
TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
dummy = binlog;
} else {
dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId);
}
binlogs.add(dummy);
this.binlogConfigCache = binlogConfigCache;
}
public TBinlog getDummyBinlog() {
return binlogs.first();
}
public long getTableId() {
return tableId;
}
// not thread safety, do this without lock
public void recoverBinlog(TBinlog binlog) {
TBinlog dummy = getDummyBinlog();
if (binlog.getCommitSeq() > dummy.getCommitSeq()) {
addBinlogWithoutCheck(binlog);
}
}
public void addBinlog(TBinlog binlog) {
lock.writeLock().lock();
try {
addBinlogWithoutCheck(binlog);
} finally {
lock.writeLock().unlock();
}
}
private void addBinlogWithoutCheck(TBinlog binlog) {
binlogs.add(binlog);
++binlog.table_ref;
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
}
public Pair<TStatus, List<TBinlog>> getBinlog(long prevCommitSeq, long numAcquired) {
lock.readLock().lock();
try {
return BinlogUtils.getBinlog(binlogs, prevCommitSeq, numAcquired);
} finally {
lock.readLock().unlock();
}
}
public Pair<TStatus, BinlogLagInfo> getBinlogLag(long prevCommitSeq) {
lock.readLock().lock();
try {
return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
}
public Pair<TStatus, Long> lockBinlog(String jobUniqueId, long lockCommitSeq) {
lock.writeLock().lock();
try {
return lockTableBinlog(jobUniqueId, lockCommitSeq);
} finally {
lock.writeLock().unlock();
}
}
// Require: the lock is held by the caller.
private Pair<TStatus, Long> lockTableBinlog(String jobUniqueId, long lockCommitSeq) {
TBinlog firstBinlog = binlogs.first();
TBinlog lastBinlog = binlogs.last();
if (lockCommitSeq < 0) {
// lock the latest binlog
lockCommitSeq = lastBinlog.getCommitSeq();
} else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
// lock the first binlog
lockCommitSeq = firstBinlog.getCommitSeq();
} else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
LOG.warn(
"try lock future binlogs, dbId: {}, tableId: {}, lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}",
dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(), jobUniqueId);
return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ), -1L);
}
// keep idempotent
Long commitSeq = lockedBinlogs.get(jobUniqueId);
if (commitSeq != null && lockCommitSeq <= commitSeq) {
LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}, tableId: {}",
commitSeq, jobUniqueId, dbId, tableId);
return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
}
lockedBinlogs.put(jobUniqueId, lockCommitSeq);
return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
}
public Optional<Long> getMinLockedCommitSeq() {
lock.readLock().lock();
try {
return lockedBinlogs.values().stream().min(Long::compareTo);
} finally {
lock.readLock().unlock();
}
}
private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
}
Iterator<TBinlog> iter = binlogs.iterator();
TBinlog dummyBinlog = iter.next();
TBinlog tombstoneUpsert = null;
TBinlog lastExpiredBinlog = null;
while (iter.hasNext()) {
TBinlog binlog = iter.next();
if (checker.isExpired(binlog)) {
lastExpiredBinlog = binlog;
--binlog.table_ref;
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getType() == TBinlogType.UPSERT) {
tombstoneUpsert = binlog;
}
iter.remove();
} else {
break;
}
}
if (lastExpiredBinlog == null) {
return null;
}
final long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
dummyBinlog.setCommitSeq(expiredCommitSeq);
dummyBinlog.setTimestamp(lastExpiredBinlog.getTimestamp());
Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
while (timeIterator.hasNext() && timeIterator.next().first <= expiredCommitSeq) {
timeIterator.remove();
}
lockedBinlogs.entrySet().removeIf(ent -> ent.getValue() <= expiredCommitSeq);
return Pair.of(tombstoneUpsert, expiredCommitSeq);
}
// this method call when db binlog enable
public BinlogTombstone commitSeqGc(long expiredCommitSeq) {
Pair<TBinlog, Long> tombstoneInfo;
// step 1: get tombstoneUpsertBinlog and dummyBinlog
lock.writeLock().lock();
try {
BinlogComparator check = (binlog) -> binlog.getCommitSeq() <= expiredCommitSeq;
tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
} finally {
lock.writeLock().unlock();
}
// step 2: set tombstone by tombstoneInfo
// if there have expired Binlogs, tombstoneInfo != null
if (tombstoneInfo == null) {
return null;
}
TBinlog lastUpsertBinlog = tombstoneInfo.first;
long largestCommitSeq = tombstoneInfo.second;
BinlogTombstone tombstone = new BinlogTombstone(tableId, largestCommitSeq);
if (lastUpsertBinlog != null) {
UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData());
tombstone.addTableRecord(tableId, upsertRecord);
}
return tombstone;
}
// this method call when db binlog disable
public BinlogTombstone gc() {
// step 1: get expire time
BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId);
Boolean isCleanFullBinlog = false;
if (tableBinlogConfig == null) {
return null;
} else if (!tableBinlogConfig.isEnable()) {
isCleanFullBinlog = true;
}
long ttlSeconds = tableBinlogConfig.getTtlSeconds();
long maxBytes = tableBinlogConfig.getMaxBytes();
long maxHistoryNums = tableBinlogConfig.getMaxHistoryNums();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
LOG.info(
"gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, "
+ "maxHistoryNums: {}, now: {}, isCleanFullBinlog: {}",
dbId, tableId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums, System.currentTimeMillis(),
isCleanFullBinlog);
// step 2: get tombstoneUpsertBinlog and dummyBinlog
Pair<TBinlog, Long> tombstoneInfo;
lock.writeLock().lock();
try {
long expiredCommitSeq = -1;
if (isCleanFullBinlog) {
expiredCommitSeq = binlogs.last().getCommitSeq();
} else {
// find the last expired commit seq.
Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
while (timeIterator.hasNext()) {
Pair<Long, Long> entry = timeIterator.next();
if (expiredMs < entry.second) {
break;
}
expiredCommitSeq = entry.first;
}
// find the min locked binlog commit seq, if not exists, use the last binlog commit seq.
Optional<Long> minLockedCommitSeq = lockedBinlogs.values().stream().min(Long::compareTo);
if (minLockedCommitSeq.isPresent() && expiredCommitSeq + 1L < minLockedCommitSeq.get()) {
// Speed up the gc progress by the min locked commit seq.
expiredCommitSeq = minLockedCommitSeq.get() - 1L;
}
}
final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator check = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
//
// The expired conditions in order:
// 1. expired time
// 2. the max bytes
// 3. the max history num
return binlog.getCommitSeq() <= lastExpiredCommitSeq
|| maxBytes < binlogSize
|| maxHistoryNums < binlogs.size();
};
tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
} finally {
lock.writeLock().unlock();
}
// step 3: set tombstone by tombstoneInfo
// if have expired Binlogs, tombstoneInfo != null
if (tombstoneInfo == null) {
return null;
}
TBinlog lastUpsertBinlog = tombstoneInfo.first;
long largestCommitSeq = tombstoneInfo.second;
BinlogTombstone tombstone = new BinlogTombstone(tableId, largestCommitSeq);
if (lastUpsertBinlog != null) {
UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData());
tombstone.addTableRecord(tableId, upsertRecord);
}
return tombstone;
}
public void replayGc(long largestExpiredCommitSeq) {
lock.writeLock().lock();
try {
BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq;
getLastUpsertAndLargestCommitSeq(checker);
} finally {
lock.writeLock().unlock();
}
}
public void getBinlogInfo(Database db, BaseProcResult result) {
BinlogConfig binlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId);
String tableName = null;
String dropped = null;
if (db == null) {
tableName = "(dropped).(unknown)";
dropped = "true";
} else {
String dbName = db.getFullName();
Table table = db.getTableNullable(tableId);
if (table == null) {
dropped = "true";
tableName = dbName + ".(dropped)";
}
dropped = "false";
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
tableName = dbName + "." + olapTable.getName();
} else {
tableName = dbName + ".(not_olaptable)";
}
}
lock.readLock().lock();
try {
List<String> info = new ArrayList<>();
info.add(tableName);
String type = "table";
info.add(type);
String id = String.valueOf(tableId);
info.add(id);
info.add(dropped);
String binlogLength = String.valueOf(binlogs.size());
info.add(binlogLength);
String binlogSize = String.valueOf(this.binlogSize);
info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
for (TBinlog binlog : binlogs) {
long timestamp = binlog.getTimestamp();
if (timestamp != -1) {
firstBinlogCommittedTime = String.valueOf(timestamp);
readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
break;
}
}
info.add(firstBinlogCommittedTime);
info.add(readableFirstBinlogCommittedTime);
String lastBinlogCommittedTime = null;
String readableLastBinlogCommittedTime = null;
Iterator<TBinlog> iterator = binlogs.descendingIterator();
while (iterator.hasNext()) {
TBinlog binlog = iterator.next();
long timestamp = binlog.getTimestamp();
if (timestamp != -1) {
lastBinlogCommittedTime = String.valueOf(timestamp);
readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
break;
}
}
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
String binlogMaxBytes = null;
String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
info.add(binlogMaxBytes);
info.add(binlogMaxHistoryNums);
result.addRow(info);
} finally {
lock.readLock().unlock();
}
}
}