CatalogRecycleBin.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.collect.Table.Cell;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CatalogRecycleBin extends MasterDaemon implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(CatalogRecycleBin.class);
private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
// erase meta at least after minEraseLatency milliseconds
// to avoid erase log ahead of drop log
private static final long minEraseLatency = 10 * 60 * 1000; // 10 min
private Map<Long, RecycleDatabaseInfo> idToDatabase;
private Map<Long, RecycleTableInfo> idToTable;
private Map<Long, RecyclePartitionInfo> idToPartition;
private Map<Long, Long> idToRecycleTime;
// for compatible in the future
@SerializedName("u")
String unused;
public CatalogRecycleBin() {
super("recycle bin", FeConstants.runningUnitTest ? 10L : DEFAULT_INTERVAL_SECONDS * 1000L);
idToDatabase = Maps.newHashMap();
idToTable = Maps.newHashMap();
idToPartition = Maps.newHashMap();
idToRecycleTime = Maps.newHashMap();
}
public synchronized boolean allTabletsInRecycledStatus(List<Long> backendTabletIds) {
Set<Long> recycledTabletSet = Sets.newHashSet();
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
addRecycledTabletsForPartition(recycledTabletSet, partition);
}
Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = idToTable.entrySet().iterator();
while (tableIter.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
RecycleTableInfo tableInfo = entry.getValue();
Table table = tableInfo.getTable();
addRecycledTabletsForTable(recycledTabletSet, table);
}
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator = idToDatabase.entrySet().iterator();
while (dbIterator.hasNext()) {
Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
for (Table table : db.getTables()) {
addRecycledTabletsForTable(recycledTabletSet, table);
}
}
return recycledTabletSet.size() >= backendTabletIds.size() && recycledTabletSet.containsAll(backendTabletIds);
}
private void addRecycledTabletsForTable(Set<Long> recycledTabletSet, Table table) {
if (table.isManagedTable()) {
OlapTable olapTable = (OlapTable) table;
Collection<Partition> allPartitions = olapTable.getAllPartitions();
for (Partition partition : allPartitions) {
addRecycledTabletsForPartition(recycledTabletSet, partition);
}
}
}
private void addRecycledTabletsForPartition(Set<Long> recycledTabletSet, Partition partition) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
recycledTabletSet.add(tablet.getId());
}
}
}
public synchronized boolean recycleDatabase(Database db, Set<String> tableNames, Set<Long> tableIds,
boolean isReplay, boolean isForceDrop, long replayRecycleTime) {
long recycleTime = 0;
if (idToDatabase.containsKey(db.getId())) {
LOG.error("db[{}] already in recycle bin.", db.getId());
return false;
}
// db should be empty. all tables are recycled before
Preconditions.checkState(db.getTables().isEmpty());
// recycle db
RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames, tableIds);
idToDatabase.put(db.getId(), databaseInfo);
if (isForceDrop) {
// The 'force drop' database should be recycle immediately.
recycleTime = 0;
} else if (!isReplay || replayRecycleTime == 0) {
recycleTime = System.currentTimeMillis();
} else {
recycleTime = replayRecycleTime;
}
idToRecycleTime.put(db.getId(), recycleTime);
LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), db.getFullName(), isForceDrop);
return true;
}
public synchronized boolean recycleTable(long dbId, Table table, boolean isReplay,
boolean isForceDrop, long replayRecycleTime) {
long recycleTime = 0;
if (idToTable.containsKey(table.getId())) {
LOG.error("table[{}] already in recycle bin.", table.getId());
return false;
}
// recycle table
RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
if (isForceDrop) {
// The 'force drop' table should be recycle immediately.
recycleTime = 0;
} else if (!isReplay || replayRecycleTime == 0) {
recycleTime = System.currentTimeMillis();
} else {
recycleTime = replayRecycleTime;
}
idToRecycleTime.put(table.getId(), recycleTime);
idToTable.put(table.getId(), tableInfo);
LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), table.getName(), isForceDrop);
return true;
}
public synchronized boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition,
Range<PartitionKey> range, PartitionItem listPartitionItem,
DataProperty dataProperty, ReplicaAllocation replicaAlloc,
boolean isInMemory, boolean isMutable) {
if (idToPartition.containsKey(partition.getId())) {
LOG.error("partition[{}] already in recycle bin.", partition.getId());
return false;
}
// recycle partition
RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition,
range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable);
idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
idToPartition.put(partition.getId(), partitionInfo);
LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(),
tableId, tableName);
return true;
}
public synchronized Long getRecycleTimeById(long id) {
return idToRecycleTime.get(id);
}
public synchronized void setRecycleTimeByIdForReplay(long id, Long recycleTime) {
idToRecycleTime.put(id, recycleTime);
}
public synchronized boolean isRecycleDatabase(long dbId) {
return idToDatabase.containsKey(dbId);
}
public synchronized boolean isRecycleTable(long dbId, long tableId) {
return isRecycleDatabase(dbId) || idToTable.containsKey(tableId);
}
public synchronized boolean isRecyclePartition(long dbId, long tableId, long partitionId) {
return isRecycleTable(dbId, tableId) || idToPartition.containsKey(partitionId);
}
public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long> partitionIds) {
dbIds.addAll(idToDatabase.keySet());
tableIds.addAll(idToTable.keySet());
partitionIds.addAll(idToPartition.keySet());
}
private synchronized boolean isExpire(long id, long currentTimeMs) {
long latency = currentTimeMs - idToRecycleTime.get(id);
return (Config.catalog_trash_ignore_min_erase_latency || latency > minEraseLatency)
&& latency > Config.catalog_trash_expire_second * 1000L;
}
private synchronized void eraseDatabase(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
// 1. erase expired database
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIter = idToDatabase.entrySet().iterator();
while (dbIter.hasNext()) {
Map.Entry<Long, RecycleDatabaseInfo> entry = dbIter.next();
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
if (isExpire(db.getId(), currentTimeMs)) {
// erase db
dbIter.remove();
idToRecycleTime.remove(entry.getKey());
Env.getCurrentEnv().eraseDatabase(db.getId(), true);
LOG.info("erase db[{}]", db.getId());
eraseNum++;
}
}
// 2. erase exceed number
if (keepNum < 0) {
return;
}
Set<String> dbNames = idToDatabase.values().stream().map(d -> d.getDb().getFullName())
.collect(Collectors.toSet());
for (String dbName : dbNames) {
eraseDatabaseWithSameName(dbName, currentTimeMs, keepNum);
}
} finally {
watch.stop();
LOG.info("eraseDatabase eraseNum: {} cost: {}ms", eraseNum, watch.getTime());
}
}
private synchronized List<Long> getSameNameDbIdListToErase(String dbName, int maxSameNameTrashNum) {
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator = idToDatabase.entrySet().iterator();
List<List<Long>> dbRecycleTimeLists = Lists.newArrayList();
while (iterator.hasNext()) {
Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
if (db.getFullName().equals(dbName)) {
List<Long> dbRecycleTimeInfo = Lists.newArrayList();
dbRecycleTimeInfo.add(entry.getKey());
dbRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey()));
dbRecycleTimeLists.add(dbRecycleTimeInfo);
}
}
List<Long> dbIdToErase = Lists.newArrayList();
if (dbRecycleTimeLists.size() <= maxSameNameTrashNum) {
return dbIdToErase;
}
// order by recycle time desc
dbRecycleTimeLists.sort((x, y) ->
(x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1));
for (int i = maxSameNameTrashNum; i < dbRecycleTimeLists.size(); i++) {
dbIdToErase.add(dbRecycleTimeLists.get(i).get(0));
}
return dbIdToErase;
}
private synchronized void eraseDatabaseWithSameName(String dbName, long currentTimeMs, int maxSameNameTrashNum) {
List<Long> dbIdToErase = getSameNameDbIdListToErase(dbName, maxSameNameTrashNum);
for (Long dbId : dbIdToErase) {
RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
if (!isExpireMinLatency(dbId, currentTimeMs)) {
continue;
}
eraseAllTables(dbInfo);
idToDatabase.remove(dbId);
idToRecycleTime.remove(dbId);
Env.getCurrentEnv().eraseDatabase(dbId, true);
LOG.info("erase database[{}] name: {}", dbId, dbName);
}
}
private synchronized boolean isExpireMinLatency(long id, long currentTimeMs) {
return (currentTimeMs - idToRecycleTime.get(id)) > minEraseLatency;
}
private void eraseAllTables(RecycleDatabaseInfo dbInfo) {
Database db = dbInfo.getDb();
Set<String> tableNames = Sets.newHashSet(dbInfo.getTableNames());
Set<Long> tableIds = Sets.newHashSet(dbInfo.getTableIds());
long dbId = db.getId();
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
while (iterator.hasNext() && !tableNames.isEmpty()) {
Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getDbId() != dbId || !tableNames.contains(tableInfo.getTable().getName())
|| !tableIds.contains(tableInfo.getTable().getId())) {
continue;
}
Table table = tableInfo.getTable();
if (table.isManagedTable()) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false);
}
iterator.remove();
idToRecycleTime.remove(table.getId());
tableNames.remove(table.getName());
Env.getCurrentEnv().getEditLog().logEraseTable(table.getId());
LOG.info("erase db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
}
}
public synchronized void replayEraseDatabase(long dbId) {
idToDatabase.remove(dbId);
idToRecycleTime.remove(dbId);
Env.getCurrentEnv().eraseDatabase(dbId, false);
LOG.info("replay erase db[{}]", dbId);
}
private synchronized void eraseTable(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
// 1. erase expired tables
Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = idToTable.entrySet().iterator();
while (tableIter.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
RecycleTableInfo tableInfo = entry.getValue();
Table table = tableInfo.getTable();
long tableId = table.getId();
if (isExpire(tableId, currentTimeMs)) {
if (table.isManagedTable()) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false);
}
// erase table
tableIter.remove();
idToRecycleTime.remove(tableId);
// log
Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
LOG.info("erase table[{}]", tableId);
eraseNum++;
}
} // end for tables
// 2. erase exceed num
if (keepNum < 0) {
return;
}
Map<Long, Set<String>> dbId2TableNames = Maps.newHashMap();
for (RecycleTableInfo tableInfo : idToTable.values()) {
Set<String> tblNames = dbId2TableNames.get(tableInfo.dbId);
if (tblNames == null) {
tblNames = Sets.newHashSet();
dbId2TableNames.put(tableInfo.dbId, tblNames);
}
tblNames.add(tableInfo.getTable().getName());
}
for (Map.Entry<Long, Set<String>> entry : dbId2TableNames.entrySet()) {
for (String tblName : entry.getValue()) {
eraseTableWithSameName(entry.getKey(), tblName, currentTimeMs, keepNum);
}
}
} finally {
watch.stop();
LOG.info("eraseTable eraseNum: {} cost: {}ms", eraseNum, watch.getTime());
}
}
private synchronized List<Long> getSameNameTableIdListToErase(long dbId, String tableName,
int maxSameNameTrashNum) {
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
List<List<Long>> tableRecycleTimeLists = Lists.newArrayList();
while (iterator.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getDbId() != dbId) {
continue;
}
Table table = tableInfo.getTable();
if (table.getName().equals(tableName)) {
List<Long> tableRecycleTimeInfo = Lists.newArrayList();
tableRecycleTimeInfo.add(entry.getKey());
tableRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey()));
tableRecycleTimeLists.add(tableRecycleTimeInfo);
}
}
List<Long> tableIdToErase = Lists.newArrayList();
if (tableRecycleTimeLists.size() <= maxSameNameTrashNum) {
return tableIdToErase;
}
// order by recycle time desc
tableRecycleTimeLists.sort((x, y) ->
(x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1));
for (int i = maxSameNameTrashNum; i < tableRecycleTimeLists.size(); i++) {
tableIdToErase.add(tableRecycleTimeLists.get(i).get(0));
}
return tableIdToErase;
}
private synchronized void eraseTableWithSameName(long dbId, String tableName, long currentTimeMs,
int maxSameNameTrashNum) {
List<Long> tableIdToErase = getSameNameTableIdListToErase(dbId, tableName, maxSameNameTrashNum);
for (Long tableId : tableIdToErase) {
RecycleTableInfo tableInfo = idToTable.get(tableId);
if (!isExpireMinLatency(tableId, currentTimeMs)) {
continue;
}
Table table = tableInfo.getTable();
if (table.isManagedTable()) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false);
}
idToTable.remove(tableId);
idToRecycleTime.remove(tableId);
Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
LOG.info("erase table[{}] name: {} from db[{}]", tableId, tableName, dbId);
}
}
public synchronized void replayEraseTable(long tableId) {
LOG.info("before replay erase table[{}]", tableId);
RecycleTableInfo tableInfo = idToTable.remove(tableId);
idToRecycleTime.remove(tableId);
if (tableInfo == null) {
// FIXME(walter): Sometimes `eraseTable` in 'DROP DB ... FORCE' may be executed earlier than
// finish drop db, especially in the case of drop db with many tables.
return;
}
Table table = tableInfo.getTable();
if (table.isManagedTable()) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, true);
}
LOG.info("replay erase table[{}]", tableId);
}
private synchronized void erasePartition(long currentTimeMs, int keepNum) {
int eraseNum = 0;
StopWatch watch = StopWatch.createStarted();
try {
// 1. erase expired partitions
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
long partitionId = entry.getKey();
if (isExpire(partitionId, currentTimeMs)) {
Env.getCurrentEnv().onErasePartition(partition);
// erase partition
iterator.remove();
idToRecycleTime.remove(partitionId);
// log
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
LOG.info("erase partition[{}]. reason: expired", partitionId);
eraseNum++;
}
} // end for partitions
// 2. erase exceed number
if (keepNum < 0) {
return;
}
com.google.common.collect.Table<Long, Long, Set<String>> dbTblId2PartitionNames = HashBasedTable.create();
for (RecyclePartitionInfo partitionInfo : idToPartition.values()) {
Set<String> partitionNames = dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId);
if (partitionNames == null) {
partitionNames = Sets.newHashSet();
dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames);
}
partitionNames.add(partitionInfo.getPartition().getName());
}
for (Cell<Long, Long, Set<String>> cell : dbTblId2PartitionNames.cellSet()) {
for (String partitionName : cell.getValue()) {
erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(), partitionName, currentTimeMs,
keepNum);
}
}
} finally {
watch.stop();
LOG.info("erasePartition eraseNum: {} cost: {}ms", eraseNum, watch.getTime());
}
}
private synchronized List<Long> getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName,
int maxSameNameTrashNum) {
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
List<List<Long>> partitionRecycleTimeLists = Lists.newArrayList();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) {
continue;
}
Partition partition = partitionInfo.getPartition();
if (partition.getName().equals(partitionName)) {
List<Long> partitionRecycleTimeInfo = Lists.newArrayList();
partitionRecycleTimeInfo.add(entry.getKey());
partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey()));
partitionRecycleTimeLists.add(partitionRecycleTimeInfo);
}
}
List<Long> partitionIdToErase = Lists.newArrayList();
if (partitionRecycleTimeLists.size() <= maxSameNameTrashNum) {
return partitionIdToErase;
}
// order by recycle time desc
partitionRecycleTimeLists.sort((x, y) ->
(x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1));
for (int i = maxSameNameTrashNum; i < partitionRecycleTimeLists.size(); i++) {
partitionIdToErase.add(partitionRecycleTimeLists.get(i).get(0));
}
return partitionIdToErase;
}
private synchronized void erasePartitionWithSameName(long dbId, long tableId, String partitionName,
long currentTimeMs, int maxSameNameTrashNum) {
List<Long> partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName,
maxSameNameTrashNum);
for (Long partitionId : partitionIdToErase) {
RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
if (!isExpireMinLatency(partitionId, currentTimeMs)) {
continue;
}
Partition partition = partitionInfo.getPartition();
Env.getCurrentEnv().onErasePartition(partition);
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", partitionId, partitionName, tableId,
dbId);
}
}
public synchronized void replayErasePartition(long partitionId) {
RecyclePartitionInfo partitionInfo = idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
if (partitionInfo == null) {
LOG.warn("replayErasePartition: partitionInfo is null for partitionId[{}]", partitionId);
return;
}
Partition partition = partitionInfo.getPartition();
Env.getCurrentEnv().onErasePartition(partition);
LOG.info("replay erase partition[{}]", partitionId);
}
public synchronized Database recoverDatabase(String dbName, long dbId) throws DdlException {
RecycleDatabaseInfo dbInfo = null;
// The recycle time of the force dropped tables and databases will be set to zero, use 1 here to
// skip these databases and tables.
long recycleTime = 1;
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator = idToDatabase.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
if (dbName.equals(entry.getValue().getDb().getFullName())) {
if (dbId == -1) {
if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
recycleTime = idToRecycleTime.get(entry.getKey());
dbInfo = entry.getValue();
}
} else if (entry.getKey() == dbId) {
dbInfo = entry.getValue();
break;
}
}
}
if (dbInfo == null) {
throw new DdlException("Unknown database '" + dbName + "' or database id '" + dbId + "'");
}
// 1. recover all tables in this db
recoverAllTables(dbInfo);
Database db = dbInfo.getDb();
// 2. remove db from idToDatabase and idToRecycleTime
idToDatabase.remove(db.getId());
idToRecycleTime.remove(db.getId());
return db;
}
public synchronized Database replayRecoverDatabase(long dbId) {
RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
try {
recoverAllTables(dbInfo);
} catch (DdlException e) {
// should not happened
LOG.error("failed replay recover database: {}", dbId, e);
}
idToDatabase.remove(dbId);
idToRecycleTime.remove(dbId);
return dbInfo.getDb();
}
private void recoverAllTables(RecycleDatabaseInfo dbInfo) throws DdlException {
Database db = dbInfo.getDb();
Set<String> tableNames = Sets.newHashSet(dbInfo.getTableNames());
Set<Long> tableIds = Sets.newHashSet(dbInfo.getTableIds());
long dbId = db.getId();
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
while (iterator.hasNext() && !tableNames.isEmpty()) {
Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getDbId() != dbId || !tableNames.contains(tableInfo.getTable().getName())
|| !tableIds.contains(tableInfo.getTable().getId())) {
continue;
}
Table table = tableInfo.getTable();
if (table.getType() == TableType.OLAP) {
db.registerTable(table);
LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
} else {
LOG.info("ignore recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
}
iterator.remove();
idToRecycleTime.remove(table.getId());
tableNames.remove(table.getName());
}
if (!tableNames.isEmpty()) {
throw new DdlException("Tables[" + tableNames + "] is missing. Can not recover db");
}
}
public synchronized boolean recoverTable(Database db, String tableName, long tableId,
String newTableName) throws DdlException {
// make sure to get db lock
Table table = null;
// The recycle time of the force dropped tables and databases will be set to zero, use 1 here to
// skip these databases and tables.
long recycleTime = 1;
long dbId = db.getId();
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getDbId() != dbId) {
continue;
}
if (!tableInfo.getTable().getName().equals(tableName)) {
continue;
}
if (tableId == -1) {
if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
recycleTime = idToRecycleTime.get(entry.getKey());
table = tableInfo.getTable();
}
} else if (entry.getKey() == tableId) {
table = tableInfo.getTable();
break;
}
}
if (table == null) {
throw new DdlException("Unknown table '" + tableName + "' or table id '" + tableId + "' in "
+ db.getFullName());
}
if (table.getType() == TableType.MATERIALIZED_VIEW) {
throw new DdlException("Can not recover materialized view '" + tableName + "' or table id '"
+ tableId + "' in " + db.getFullName());
}
innerRecoverTable(db, table, tableName, newTableName, null, false);
LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
return true;
}
public synchronized void replayRecoverTable(Database db, long tableId, String newTableName) throws DdlException {
// make sure to get db write lock
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getTable().getId() != tableId) {
continue;
}
Preconditions.checkState(tableInfo.getDbId() == db.getId());
Table table = tableInfo.getTable();
String tableName = table.getName();
if (innerRecoverTable(db, table, tableName, newTableName, iterator, true)) {
break;
}
}
}
private synchronized boolean innerRecoverTable(Database db, Table table, String tableName, String newTableName,
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator,
boolean isReplay) throws DdlException {
table.writeLock();
try {
if (!Strings.isNullOrEmpty(newTableName)) {
if (Env.isStoredTableNamesLowerCase()) {
newTableName = newTableName.toLowerCase();
}
if (!tableName.equals(newTableName)) {
// check if name is already used
if (db.getTable(newTableName).isPresent()) {
throw new DdlException("Table name[" + newTableName + "] is already used");
}
if (table.isManagedTable()) {
// olap table should also check if any rollup has same name as "newTableName"
((OlapTable) table).checkAndSetName(newTableName, false);
} else {
table.setName(newTableName);
}
}
}
db.registerTable(table);
if (isReplay) {
iterator.remove();
} else {
idToTable.remove(table.getId());
}
idToRecycleTime.remove(table.getId());
if (isReplay) {
LOG.info("replay recover table[{}]", table.getId());
} else {
// log
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(),
-1L, "", table.getName(), newTableName, "", "");
Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
}
// Only olap table need recover dynamic partition, other table like jdbc odbc view.. do not need it
if (table.isManagedTable()) {
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), (OlapTable) table, isReplay);
}
} finally {
table.writeUnlock();
}
return true;
}
public synchronized void recoverPartition(long dbId, OlapTable table, String partitionName,
long partitionIdToRecover, String newPartitionName) throws DdlException {
if (table.getType() == TableType.MATERIALIZED_VIEW) {
throw new DdlException("Can not recover partition in materialized view: " + table.getName());
}
long recycleTime = -1;
// make sure to get db write lock
RecyclePartitionInfo recoverPartitionInfo = null;
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
if (partitionInfo.getTableId() != table.getId()) {
continue;
}
if (!partitionInfo.getPartition().getName().equalsIgnoreCase(partitionName)) {
continue;
}
if (partitionIdToRecover == -1) {
if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
recycleTime = idToRecycleTime.get(entry.getKey());
recoverPartitionInfo = partitionInfo;
}
} else if (entry.getKey() == partitionIdToRecover) {
recoverPartitionInfo = partitionInfo;
break;
}
}
if (recoverPartitionInfo == null) {
throw new DdlException("No partition named '" + partitionName + "' or partition id '" + partitionIdToRecover
+ "' in table " + table.getName());
}
PartitionInfo partitionInfo = table.getPartitionInfo();
PartitionItem recoverItem = null;
if (partitionInfo.getType() == PartitionType.RANGE) {
recoverItem = new RangePartitionItem(recoverPartitionInfo.getRange());
} else if (partitionInfo.getType() == PartitionType.LIST) {
recoverItem = recoverPartitionInfo.getListPartitionItem();
}
// check if partition item is invalid
if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) {
throw new DdlException("Can not recover partition[" + partitionName + "]. Partition item conflict.");
}
// check if schema change
Partition recoverPartition = recoverPartitionInfo.getPartition();
Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
Set<Long> partitionIndex = recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
.map(i -> i.getId()).collect(Collectors.toSet());
if (!tableIndex.equals(partitionIndex)) {
throw new DdlException("table's index not equal with partition's index. table's index=" + tableIndex
+ ", partition's index=" + partitionIndex);
}
// check if partition name exists
Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
if (!Strings.isNullOrEmpty(newPartitionName)) {
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" + newPartitionName + "] is already used");
}
recoverPartition.setName(newPartitionName);
}
// recover partition
table.addPartition(recoverPartition);
// recover partition info
long partitionId = recoverPartition.getId();
partitionInfo.setItem(partitionId, false, recoverItem);
partitionInfo.setDataProperty(partitionId, recoverPartitionInfo.getDataProperty());
partitionInfo.setReplicaAllocation(partitionId, recoverPartitionInfo.getReplicaAlloc());
partitionInfo.setIsInMemory(partitionId, recoverPartitionInfo.isInMemory());
partitionInfo.setIsMutable(partitionId, recoverPartitionInfo.isMutable());
// remove from recycle bin
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
// log
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "",
table.getName(), "", partitionName, newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
}
// The caller should keep table write lock
public synchronized void replayRecoverPartition(OlapTable table, long partitionId,
String newPartitionName) throws DdlException {
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
RecyclePartitionInfo recyclePartitionInfo = entry.getValue();
if (recyclePartitionInfo.getPartition().getId() != partitionId) {
continue;
}
Preconditions.checkState(recyclePartitionInfo.getTableId() == table.getId());
if (!Strings.isNullOrEmpty(newPartitionName)) {
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" + newPartitionName + "] is already used");
}
}
table.addPartition(recyclePartitionInfo.getPartition());
if (!Strings.isNullOrEmpty(newPartitionName)) {
table.renamePartition(recyclePartitionInfo.getPartition().getName(), newPartitionName);
}
PartitionInfo partitionInfo = table.getPartitionInfo();
PartitionItem recoverItem = null;
if (partitionInfo.getType() == PartitionType.RANGE) {
recoverItem = new RangePartitionItem(recyclePartitionInfo.getRange());
} else if (partitionInfo.getType() == PartitionType.LIST) {
recoverItem = recyclePartitionInfo.getListPartitionItem();
}
partitionInfo.setItem(partitionId, false, recoverItem);
partitionInfo.setDataProperty(partitionId, recyclePartitionInfo.getDataProperty());
partitionInfo.setReplicaAllocation(partitionId, recyclePartitionInfo.getReplicaAlloc());
partitionInfo.setIsInMemory(partitionId, recyclePartitionInfo.isInMemory());
partitionInfo.setIsMutable(partitionId, recyclePartitionInfo.isMutable());
iterator.remove();
idToRecycleTime.remove(partitionId);
LOG.info("replay recover partition[{}]", partitionId);
break;
}
}
// erase database in catalog recycle bin instantly
public synchronized void eraseDatabaseInstantly(long dbId) throws DdlException {
// 1. find dbInfo and erase db
RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
if (dbInfo != null) {
// erase db
Env.getCurrentEnv().eraseDatabase(dbId, true);
// erase db from idToDatabase and idToRecycleTime
idToDatabase.remove(dbId);
idToRecycleTime.remove(dbId);
// log for erase db
String dbName = dbInfo.getDb().getName();
LOG.info("erase db[{}]: {}", dbId, dbName);
}
// 2. remove all tables with the same dbId
List<Long> tableIdToErase = Lists.newArrayList();
Iterator<Map.Entry<Long, RecycleTableInfo>> tableIterator = idToTable.entrySet().iterator();
while (tableIterator.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = tableIterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getDbId() == dbId) {
tableIdToErase.add(entry.getKey());
}
}
for (Long tableId : tableIdToErase) {
eraseTableInstantly(tableId);
}
// 3. remove all partitions with the same dbId
List<Long> partitionIdToErase = Lists.newArrayList();
Iterator<Map.Entry<Long, RecyclePartitionInfo>> partitionIterator = idToPartition.entrySet().iterator();
while (partitionIterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = partitionIterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
if (partitionInfo.getDbId() == dbId) {
partitionIdToErase.add(entry.getKey());
}
}
for (Long partitionId : partitionIdToErase) {
erasePartitionInstantly(partitionId);
}
// 4. determine if nothing is deleted
if (dbInfo == null && tableIdToErase.isEmpty() && partitionIdToErase.isEmpty()) {
throw new DdlException("Unknown database id '" + dbId + "'");
}
}
// erase table in catalog recycle bin instantly
public synchronized void eraseTableInstantly(long tableId) throws DdlException {
// 1. find tableInfo and erase table
RecycleTableInfo tableInfo = idToTable.get(tableId);
if (tableInfo != null) {
// erase table
long dbId = tableInfo.getDbId();
Table table = tableInfo.getTable();
if (table.getType() == TableType.OLAP || table.getType() == TableType.MATERIALIZED_VIEW) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false);
}
// erase table from idToTable and idToRecycleTime
idToTable.remove(tableId);
idToRecycleTime.remove(tableId);
// log for erase table
String tableName = table.getName();
Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
LOG.info("erase db[{}]'s table[{}]: {}", dbId, tableId, tableName);
}
// 2. erase all partitions with the same tableId
List<Long> partitionIdToErase = Lists.newArrayList();
Iterator<Map.Entry<Long, RecyclePartitionInfo>> partitionIterator = idToPartition.entrySet().iterator();
while (partitionIterator.hasNext()) {
Map.Entry<Long, RecyclePartitionInfo> entry = partitionIterator.next();
RecyclePartitionInfo partitionInfo = entry.getValue();
if (partitionInfo.getTableId() == tableId) {
partitionIdToErase.add(entry.getKey());
}
}
for (Long partitionId : partitionIdToErase) {
erasePartitionInstantly(partitionId);
}
// 3. determine if nothing is deleted
if (tableInfo == null && partitionIdToErase.isEmpty()) {
throw new DdlException("Unknown table id '" + tableId + "'");
}
}
// erase partition in catalog recycle bin instantly
public synchronized void erasePartitionInstantly(long partitionId) throws DdlException {
// 1. find partitionInfo to erase
RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
if (partitionInfo == null) {
throw new DdlException("No partition id '" + partitionId + "'");
}
// 2. erase partition
Partition partition = partitionInfo.getPartition();
Env.getCurrentEnv().onErasePartition(partition);
// 3. erase partition in idToPartition and idToRecycleTime
idToPartition.remove(partitionId);
idToRecycleTime.remove(partitionId);
// 4. log for erase partition
long tableId = partitionInfo.getTableId();
String partitionName = partition.getName();
Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
LOG.info("erase table[{}]'s partition[{}]: {}", tableId, partitionId, partitionName);
}
// no need to use synchronized.
// only called when loading image
public void addTabletToInvertedIndex() {
// no need to handle idToDatabase. Database is already empty before being put here
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
// idToTable
for (RecycleTableInfo tableInfo : idToTable.values()) {
Table table = tableInfo.getTable();
if (!table.isManagedTable()) {
continue;
}
long dbId = tableInfo.getDbId();
OlapTable olapTable = (OlapTable) table;
long tableId = olapTable.getId();
for (Partition partition : olapTable.getAllPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
}
} // end for indices
} // end for partitions
}
// idToPartition
for (RecyclePartitionInfo partitionInfo : idToPartition.values()) {
long dbId = partitionInfo.getDbId();
long tableId = partitionInfo.getTableId();
Partition partition = partitionInfo.getPartition();
long partitionId = partition.getId();
// we need to get olap table to get schema hash info
// first find it in catalog. if not found, it should be in recycle bin
OlapTable olapTable = null;
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
// just log. db should be in recycle bin
if (!idToDatabase.containsKey(dbId)) {
LOG.error("db[{}] is neither in catalog nor in recycle bin"
+ " when rebuilding inverted index from recycle bin, partition[{}]",
dbId, partitionId);
continue;
}
} else {
olapTable = (OlapTable) db.getTableNullable(tableId);
}
if (olapTable == null) {
if (!idToTable.containsKey(tableId)) {
LOG.error("table[{}] is neither in catalog nor in recycle bin"
+ " when rebuilding inverted index from recycle bin, partition[{}]",
tableId, partitionId);
continue;
}
RecycleTableInfo tableInfo = idToTable.get(tableId);
olapTable = (OlapTable) tableInfo.getTable();
}
Preconditions.checkNotNull(olapTable);
// storage medium should be got from RecyclePartitionInfo, not from olap table. because olap table
// does not have this partition any more
TStorageMedium medium = partitionInfo.getDataProperty().getStorageMedium();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
}
} // end for indices
}
}
@Override
protected void runAfterCatalogReady() {
long currentTimeMs = System.currentTimeMillis();
// should follow the partition/table/db order
// in case of partition(table) is still in recycle bin but table(db) is missing
int keepNum = Config.max_same_name_catalog_trash_num;
erasePartition(currentTimeMs, keepNum);
eraseTable(currentTimeMs, keepNum);
eraseDatabase(currentTimeMs, keepNum);
}
public synchronized List<List<String>> getInfo() {
Map<Long, Pair<Long, Long>> dbToDataSize = new HashMap<>();
List<List<String>> tableInfos = Lists.newArrayList();
for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
List<String> info = Lists.newArrayList();
info.add("Table");
RecycleTableInfo tableInfo = entry.getValue();
Table table = tableInfo.getTable();
info.add(table.getName());
info.add(String.valueOf(tableInfo.getDbId()));
info.add(String.valueOf(entry.getKey()));
info.add("");
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
// data size
long dataSize = table.getDataSize(false);
info.add(DebugUtil.printByteWithUnit(dataSize));
// remote data size
long remoteDataSize = table instanceof OlapTable ? ((OlapTable) table).getRemoteDataSize() : 0L;
info.add(DebugUtil.printByteWithUnit(remoteDataSize));
// calculate database data size
dbToDataSize.compute(tableInfo.getDbId(), (k, v) -> {
if (v == null) {
return Pair.of(dataSize, remoteDataSize);
} else {
v.first += dataSize;
v.second += remoteDataSize;
return v;
}
});
tableInfos.add(info);
}
// sort by Name, DropTime
tableInfos.sort((x, y) -> {
int nameRet = x.get(1).compareTo(y.get(1));
if (nameRet == 0) {
return x.get(5).compareTo(y.get(5));
} else {
return nameRet;
}
});
List<List<String>> partitionInfos = Lists.newArrayList();
for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
List<String> info = Lists.newArrayList();
info.add("Partition");
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
info.add(partition.getName());
info.add(String.valueOf(partitionInfo.getDbId()));
info.add(String.valueOf(partitionInfo.getTableId()));
info.add(String.valueOf(entry.getKey()));
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
// data size
long dataSize = partition.getDataSize(false);
info.add(DebugUtil.printByteWithUnit(dataSize));
// remote data size
long remoteDataSize = partition.getRemoteDataSize();
info.add(DebugUtil.printByteWithUnit(remoteDataSize));
// calculate database data size
dbToDataSize.compute(partitionInfo.getDbId(), (k, v) -> {
if (v == null) {
return Pair.of(dataSize, remoteDataSize);
} else {
v.first += dataSize;
v.second += remoteDataSize;
return v;
}
});
partitionInfos.add(info);
}
// sort by Name, DropTime
partitionInfos.sort((x, y) -> {
int nameRet = x.get(1).compareTo(y.get(1));
if (nameRet == 0) {
return x.get(5).compareTo(y.get(5));
} else {
return nameRet;
}
});
List<List<String>> dbInfos = Lists.newArrayList();
for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
List<String> info = Lists.newArrayList();
info.add("Database");
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
info.add(db.getFullName());
info.add(String.valueOf(entry.getKey()));
info.add("");
info.add("");
//info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
// data size
Pair<Long, Long> dataSizePair = dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L));
info.add(DebugUtil.printByteWithUnit(dataSizePair.first));
// remote data size
info.add(DebugUtil.printByteWithUnit(dataSizePair.second));
dbInfos.add(info);
}
// sort by Name, DropTime
dbInfos.sort((x, y) -> {
int nameRet = x.get(1).compareTo(y.get(1));
if (nameRet == 0) {
return x.get(5).compareTo(y.get(5));
} else {
return nameRet;
}
});
return Stream.of(dbInfos, tableInfos, partitionInfos).flatMap(Collection::stream).collect(Collectors.toList());
}
public synchronized Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
Map<Long, Pair<Long, Long>> dbToRecycleSize = new HashMap<>();
for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
RecycleTableInfo tableInfo = entry.getValue();
Table table = tableInfo.getTable();
if (!(table instanceof OlapTable)) {
continue;
}
long dataSize = table.getDataSize(false);
long remoteDataSize = ((OlapTable) table).getRemoteDataSize();
dbToRecycleSize.compute(tableInfo.getDbId(), (k, v) -> {
if (v == null) {
return Pair.of(dataSize, remoteDataSize);
} else {
v.first += dataSize;
v.second += remoteDataSize;
return v;
}
});
}
for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
long dataSize = partition.getDataSize(false);
long remoteDataSize = partition.getRemoteDataSize();
dbToRecycleSize.compute(partitionInfo.getDbId(), (k, v) -> {
if (v == null) {
return Pair.of(dataSize, remoteDataSize);
} else {
v.first += dataSize;
v.second += remoteDataSize;
return v;
}
});
}
return dbToRecycleSize;
}
// Need to add "synchronized", because when calling /dump api to dump image,
// this class is not protected by any lock, will throw ConcurrentModificationException.
@Override
public synchronized void write(DataOutput out) throws IOException {
out.writeInt(idToDatabase.size());
for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
out.writeInt(idToTable.size());
for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
out.writeInt(idToPartition.size());
for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
out.writeInt(idToRecycleTime.size());
for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public void readFieldsWithGson(DataInput in) throws IOException {
int count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
RecycleDatabaseInfo dbInfo = new RecycleDatabaseInfo();
dbInfo.readFields(in);
idToDatabase.put(id, dbInfo);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
RecycleTableInfo tableInfo = new RecycleTableInfo();
tableInfo = tableInfo.read(in);
idToTable.put(id, tableInfo);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo();
partitionInfo = partitionInfo.read(in);
idToPartition.put(id, partitionInfo);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
long time = in.readLong();
idToRecycleTime.put(id, time);
}
GsonUtils.GSON.fromJson(Text.readString(in), CatalogRecycleBin.class);
}
public static CatalogRecycleBin read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
CatalogRecycleBin bin = new CatalogRecycleBin();
bin.readFields(in);
return bin;
} else if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_139) {
return GsonUtils.GSON.fromJson(Text.readString(in), CatalogRecycleBin.class);
} else {
CatalogRecycleBin bin = new CatalogRecycleBin();
bin.readFieldsWithGson(in);
return bin;
}
}
@Override
public void gsonPostProcess() throws IOException {
updateDbInfoForLowerVersion();
}
@Deprecated
public void readFields(DataInput in) throws IOException {
int count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
RecycleDatabaseInfo dbInfo = new RecycleDatabaseInfo();
dbInfo.readFields(in);
idToDatabase.put(id, dbInfo);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
RecycleTableInfo tableInfo = new RecycleTableInfo();
tableInfo.readFields(in);
idToTable.put(id, tableInfo);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo();
partitionInfo.readFields(in);
idToPartition.put(id, partitionInfo);
}
count = in.readInt();
for (int i = 0; i < count; i++) {
long id = in.readLong();
long time = in.readLong();
idToRecycleTime.put(id, time);
}
updateDbInfoForLowerVersion();
}
private void updateDbInfoForLowerVersion() {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_114) {
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator = idToDatabase.entrySet().iterator();
while (dbIterator.hasNext()) {
Map.Entry<Long, RecycleDatabaseInfo> dbEntry = dbIterator.next();
RecycleDatabaseInfo dbInfo = dbEntry.getValue();
Set<String> tableNames = Sets.newHashSet(dbInfo.getTableNames());
Set<Long> tableIds = Sets.newHashSet();
Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
RecycleTableInfo tableInfo = entry.getValue();
if (tableInfo.getDbId() != dbEntry.getKey()
|| !tableNames.contains(tableInfo.getTable().getName())) {
continue;
}
tableIds.add(entry.getKey());
}
dbInfo.setTableIds(tableIds);
}
}
}
public class RecycleDatabaseInfo {
private Database db;
private Set<String> tableNames;
private Set<Long> tableIds;
// for compatibility in the future
@SerializedName("u")
private String unused;
public RecycleDatabaseInfo() {
tableNames = Sets.newHashSet();
tableIds = Sets.newHashSet();
}
public RecycleDatabaseInfo(Database db, Set<String> tableNames, Set<Long> tableIds) {
this.db = db;
this.tableNames = tableNames;
this.tableIds = tableIds;
}
public Database getDb() {
return db;
}
public Set<String> getTableNames() {
return tableNames;
}
public Set<Long> getTableIds() {
return tableIds;
}
public Set<Long> setTableIds(Set<Long> tableIds) {
return this.tableIds = tableIds;
}
public void write(DataOutput out) throws IOException {
db.write(out);
out.writeInt(tableNames.size());
for (String tableName : tableNames) {
Text.writeString(out, tableName);
}
out.writeInt(tableIds.size());
for (Long tableId : tableIds) {
out.writeLong(tableId);
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public void readFields(DataInput in) throws IOException {
db = Database.read(in);
int count = in.readInt();
for (int i = 0; i < count; i++) {
String tableName = Text.readString(in);
tableNames.add(tableName);
}
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_114) {
count = in.readInt();
for (int i = 0; i < count; i++) {
long tableId = in.readLong();
tableIds.add(tableId);
}
}
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_139) {
GsonUtils.GSON.fromJson(Text.readString(in), RecycleDatabaseInfo.class);
}
}
}
public class RecycleTableInfo {
@SerializedName("did")
private long dbId;
@SerializedName("t")
private Table table;
public RecycleTableInfo() {
// for persist
}
public RecycleTableInfo(long dbId, Table table) {
this.dbId = dbId;
this.table = table;
}
public long getDbId() {
return dbId;
}
public Table getTable() {
return table;
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public RecycleTableInfo read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), RecycleTableInfo.class);
}
@Deprecated
public void readFields(DataInput in) throws IOException {
dbId = in.readLong();
table = Table.read(in);
}
}
public class RecyclePartitionInfo {
@SerializedName("did")
private long dbId;
@SerializedName("tid")
private long tableId;
@SerializedName("p")
private Partition partition;
@SerializedName("r")
private Range<PartitionKey> range;
@SerializedName("lpi")
private PartitionItem listPartitionItem;
@SerializedName("dp")
private DataProperty dataProperty;
@SerializedName("ra")
private ReplicaAllocation replicaAlloc;
@SerializedName("im")
private boolean isInMemory;
@SerializedName("mu")
private boolean isMutable = true;
public RecyclePartitionInfo() {
// for persist
}
public RecyclePartitionInfo(long dbId, long tableId, Partition partition,
Range<PartitionKey> range, PartitionItem listPartitionItem,
DataProperty dataProperty, ReplicaAllocation replicaAlloc,
boolean isInMemory, boolean isMutable) {
this.dbId = dbId;
this.tableId = tableId;
this.partition = partition;
this.range = range;
this.listPartitionItem = listPartitionItem;
this.dataProperty = dataProperty;
this.replicaAlloc = replicaAlloc;
this.isInMemory = isInMemory;
this.isMutable = isMutable;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
public Partition getPartition() {
return partition;
}
public Range<PartitionKey> getRange() {
return range;
}
public PartitionItem getListPartitionItem() {
return listPartitionItem;
}
public DataProperty getDataProperty() {
return dataProperty;
}
public ReplicaAllocation getReplicaAlloc() {
return replicaAlloc;
}
public boolean isInMemory() {
return isInMemory;
}
public boolean isMutable() {
return isMutable;
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public RecyclePartitionInfo read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), RecyclePartitionInfo.class);
}
public void readFields(DataInput in) throws IOException {
dbId = in.readLong();
tableId = in.readLong();
partition = Partition.read(in);
range = RangeUtils.readRange(in);
listPartitionItem = ListPartitionItem.read(in);
dataProperty = DataProperty.read(in);
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
short replicationNum = in.readShort();
replicaAlloc = new ReplicaAllocation(replicationNum);
} else {
replicaAlloc = ReplicaAllocation.read(in);
}
isInMemory = in.readBoolean();
if (Config.isCloudMode()) {
// HACK: the origin implementation of the cloud mode has code likes:
//
// isPersistent = in.readBoolean();
//
// keep the compatibility here.
in.readBoolean();
}
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
isMutable = in.readBoolean();
}
}
}
// currently only used when loading image. So no synchronized protected.
public List<Long> getAllDbIds() {
return Lists.newArrayList(idToDatabase.keySet());
}
}