CatalogRecycleBin.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CatalogRecycleBin extends MasterDaemon implements Writable {
    private static final Logger LOG = LogManager.getLogger(CatalogRecycleBin.class);
    private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
    // erase meta at least after minEraseLatency milliseconds
    // to avoid erase log ahead of drop log
    private static final long minEraseLatency = 10 * 60 * 1000;  // 10 min

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private void readLock() {
        lock.readLock().lock();
    }

    private void readUnlock() {
        lock.readLock().unlock();
    }

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    private ConcurrentHashMap<Long, RecycleDatabaseInfo> idToDatabase;
    private ConcurrentHashMap<Long, RecycleTableInfo> idToTable;
    private ConcurrentHashMap<Long, RecyclePartitionInfo> idToPartition;
    private ConcurrentHashMap<Long, Long> idToRecycleTime;

    // Caches below to avoid calculate meta with same name every demon run cycle.
    // When the meta is updated, these caches should be updated too. No need to
    // persist these caches because they can be recalculated when FE restarting.
    // String:<DbName> -> Set:<Db Ids with same name>
    Map<String, Set<Long>> dbNameToIds = new ConcurrentHashMap<>();
    // (DbId, TableName) -> Set:<Table Ids with same name>
    Map<Pair<Long, String>, Set<Long>> dbIdTableNameToIds = new ConcurrentHashMap<>();
    // (DbId, TblId) -> (PartitionName -> Set:<Partition Ids with same name>)
    Map<Pair<Long, Long>, Map<String, Set<Long>>> dbTblIdPartitionNameToIds = new ConcurrentHashMap<>();

    // for compatible in the future
    @SerializedName("u")
    String unused;

    public CatalogRecycleBin() {
        super("recycle bin", FeConstants.runningUnitTest ? 10 * 1000L : DEFAULT_INTERVAL_SECONDS * 1000L);
        idToDatabase = new ConcurrentHashMap<>();
        idToTable = new ConcurrentHashMap<>();
        idToPartition = new ConcurrentHashMap<>();
        idToRecycleTime = new ConcurrentHashMap<>();
    }

    public boolean allTabletsInRecycledStatus(List<Long> backendTabletIds) {
        readLock();
        try {
            Set<Long> recycledTabletSet = Sets.newHashSet();

            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
                RecyclePartitionInfo partitionInfo = entry.getValue();
                Partition partition = partitionInfo.getPartition();
                addRecycledTabletsForPartition(recycledTabletSet, partition);
            }

            Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = idToTable.entrySet().iterator();
            while (tableIter.hasNext()) {
                Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
                RecycleTableInfo tableInfo = entry.getValue();
                Table table = tableInfo.getTable();
                addRecycledTabletsForTable(recycledTabletSet, table);
            }

            Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator = idToDatabase.entrySet().iterator();
            while (dbIterator.hasNext()) {
                Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
                RecycleDatabaseInfo dbInfo = entry.getValue();
                Database db = dbInfo.getDb();
                for (Table table : db.getTables()) {
                    addRecycledTabletsForTable(recycledTabletSet, table);
                }
            }

            return recycledTabletSet.size() >= backendTabletIds.size()
                    && recycledTabletSet.containsAll(backendTabletIds);
        } finally {
            readUnlock();
        }
    }

    private void addRecycledTabletsForTable(Set<Long> recycledTabletSet, Table table) {
        if (table.isManagedTable()) {
            OlapTable olapTable = (OlapTable) table;
            Collection<Partition> allPartitions = olapTable.getAllPartitions();
            for (Partition partition : allPartitions) {
                addRecycledTabletsForPartition(recycledTabletSet, partition);
            }
        }
    }

    private void addRecycledTabletsForPartition(Set<Long> recycledTabletSet, Partition partition) {
        for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
            for (Tablet tablet : index.getTablets()) {
                recycledTabletSet.add(tablet.getId());
            }
        }
    }

    public boolean recycleDatabase(Database db, Set<String> tableNames, Set<Long> tableIds,
                                                boolean isReplay, boolean isForceDrop, long replayRecycleTime) {
        writeLock();
        try {
            long recycleTime = 0;
            if (idToDatabase.containsKey(db.getId())) {
                LOG.error("db[{}] already in recycle bin.", db.getId());
                return false;
            }

            // db should be empty. all tables are recycled before
            if (!db.getTableIds().isEmpty()) {
                throw new IllegalStateException("Database " + db.getFullName() + " is not empty. Contains tables: "
                                                + db.getTableIds().stream().collect(Collectors.toSet()));
            }

            // recycle db
            RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames, tableIds);
            idToDatabase.put(db.getId(), databaseInfo);
            if (isForceDrop) {
                // The 'force drop' database should be recycle immediately.
                recycleTime = 0;
            } else if (!isReplay || replayRecycleTime == 0) {
                recycleTime = System.currentTimeMillis();
            } else {
                recycleTime = replayRecycleTime;
            }
            idToRecycleTime.put(db.getId(), recycleTime);
            dbNameToIds.computeIfAbsent(db.getFullName(), k -> ConcurrentHashMap.newKeySet()).add(db.getId());
            LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), db.getFullName(), isForceDrop);
            return true;
        } finally {
            writeUnlock();
        }
    }

    public boolean recycleTable(long dbId, Table table, boolean isReplay,
                                             boolean isForceDrop, long replayRecycleTime) {
        writeLock();
        try {
            long recycleTime = 0;
            if (idToTable.containsKey(table.getId())) {
                LOG.error("table[{}] already in recycle bin.", table.getId());
                return false;
            }

            // recycle table
            RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
            if (isForceDrop) {
                // The 'force drop' table should be recycle immediately.
                recycleTime = 0;
            } else if (!isReplay || replayRecycleTime == 0) {
                recycleTime = System.currentTimeMillis();
            } else {
                recycleTime = replayRecycleTime;
            }
            idToRecycleTime.put(table.getId(), recycleTime);
            idToTable.put(table.getId(), tableInfo);
            dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
                    k -> ConcurrentHashMap.newKeySet()).add(table.getId());
            LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), table.getName(), isForceDrop);
            return true;
        } finally {
            writeUnlock();
        }
    }

    public boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition,
                                                 Range<PartitionKey> range, PartitionItem listPartitionItem,
                                                 DataProperty dataProperty, ReplicaAllocation replicaAlloc,
                                                 boolean isInMemory, boolean isMutable) {
        writeLock();
        try {
            if (idToPartition.containsKey(partition.getId())) {
                LOG.error("partition[{}] already in recycle bin.", partition.getId());
                return false;
            }

            // recycle partition
            RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition,
                    range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable);
            idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
            idToPartition.put(partition.getId(), partitionInfo);
            dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId), k -> new ConcurrentHashMap<>())
                    .computeIfAbsent(partition.getName(), k -> ConcurrentHashMap.newKeySet()).add(partition.getId());
            LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(),
                    tableId, tableName);
            return true;
        } finally {
            writeUnlock();
        }
    }

    public Long getRecycleTimeById(long id) {
        return idToRecycleTime.get(id);
    }

    public void setRecycleTimeByIdForReplay(long id, Long recycleTime) {
        idToRecycleTime.put(id, recycleTime);
    }

    public boolean isRecycleDatabase(long dbId) {
        return idToDatabase.containsKey(dbId);
    }

    public boolean isRecycleTable(long dbId, long tableId) {
        return isRecycleDatabase(dbId) || idToTable.containsKey(tableId);
    }

    public boolean isRecyclePartition(long dbId, long tableId, long partitionId) {
        return isRecycleTable(dbId, tableId) || idToPartition.containsKey(partitionId);
    }

    public void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long> partitionIds) {
        dbIds.addAll(idToDatabase.keySet());
        tableIds.addAll(idToTable.keySet());
        partitionIds.addAll(idToPartition.keySet());
    }

    private boolean isExpire(long id, long currentTimeMs) {
        long latency = currentTimeMs - idToRecycleTime.get(id);
        return (Config.catalog_trash_ignore_min_erase_latency || latency > minEraseLatency)
                && latency > Config.catalog_trash_expire_second * 1000L;
    }

    private void eraseDatabase(long currentTimeMs, int keepNum) {
        int eraseNum = 0;
        StopWatch watch = StopWatch.createStarted();
        try {
            // 1. collect expired database IDs under read lock
            List<Long> expiredIds = new ArrayList<>();
            readLock();
            try {
                for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
                    if (isExpire(entry.getKey(), currentTimeMs)) {
                        expiredIds.add(entry.getKey());
                    }
                }
            } finally {
                readUnlock();
            }

            // 2. erase each expired database one at a time
            for (Long dbId : expiredIds) {
                writeLock();
                try {
                    RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
                    if (dbInfo == null) {
                        continue;
                    }
                    Database db = dbInfo.getDb();
                    idToRecycleTime.remove(dbId);

                    dbNameToIds.computeIfPresent(db.getFullName(), (k, v) -> {
                        v.remove(db.getId());
                        return v.isEmpty() ? null : v;
                    });

                    Env.getCurrentEnv().eraseDatabase(db.getId(), true);
                    LOG.info("erase db[{}]", db.getId());
                    eraseNum++;
                } finally {
                    writeUnlock();
                }
            }

            // 3. erase exceed number
            if (keepNum < 0) {
                return;
            }
            List<Map.Entry<String, Set<Long>>> groups;
            readLock();
            try {
                groups = new ArrayList<>(dbNameToIds.entrySet());
            } finally {
                readUnlock();
            }
            for (Map.Entry<String, Set<Long>> entry : groups) {
                String dbName = entry.getKey();
                eraseDatabaseWithSameName(dbName, currentTimeMs, keepNum, Lists.newArrayList(entry.getValue()));
            }
        } finally {
            watch.stop();
            LOG.info("eraseDatabase eraseNum: {} cost: {}ms", eraseNum, watch.getTime());
        }
    }

    private void eraseDatabaseWithSameName(String dbName, long currentTimeMs,
                                                        int maxSameNameTrashNum, List<Long> sameNameDbIdList) {
        List<Long> dbIdToErase;
        readLock();
        try {
            dbIdToErase = getIdListToEraseByRecycleTime(sameNameDbIdList, maxSameNameTrashNum);
        } finally {
            readUnlock();
        }
        for (Long dbId : dbIdToErase) {
            writeLock();
            try {
                RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
                if (dbInfo == null || !isExpireMinLatency(dbId, currentTimeMs)) {
                    continue;
                }
                eraseAllTables(dbInfo);
                idToDatabase.remove(dbId);
                idToRecycleTime.remove(dbId);

                dbNameToIds.computeIfPresent(dbName, (k, v) -> {
                    v.remove(dbId);
                    return v.isEmpty() ? null : v;
                });

                Env.getCurrentEnv().eraseDatabase(dbId, true);
                LOG.info("erase database[{}] name: {}", dbId, dbName);
            } finally {
                writeUnlock();
            }
        }
    }

    private boolean isExpireMinLatency(long id, long currentTimeMs) {
        return (currentTimeMs - idToRecycleTime.get(id)) > minEraseLatency || FeConstants.runningUnitTest;
    }

    private void eraseAllTables(RecycleDatabaseInfo dbInfo) {
        Database db = dbInfo.getDb();
        Set<String> tableNames = Sets.newHashSet(dbInfo.getTableNames());
        Set<Long> tableIds = Sets.newHashSet(dbInfo.getTableIds());
        long dbId = db.getId();
        Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
        while (iterator.hasNext() && !tableNames.isEmpty()) {
            Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
            RecycleTableInfo tableInfo = entry.getValue();
            if (tableInfo.getDbId() != dbId || !tableNames.contains(tableInfo.getTable().getName())
                    || !tableIds.contains(tableInfo.getTable().getId())) {
                continue;
            }

            Table table = tableInfo.getTable();
            if (table.isManagedTable()) {
                Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table, false);
            }
            iterator.remove();
            idToRecycleTime.remove(table.getId());
            tableNames.remove(table.getName());

            dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(), table.getName()), (k, v) -> {
                v.remove(table.getId());
                return v.isEmpty() ? null : v;
            });

            Env.getCurrentEnv().getEditLog().logEraseTable(table.getId());
            LOG.info("erase db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
        }
    }

    public void replayEraseDatabase(long dbId) {
        writeLock();
        try {
            RecycleDatabaseInfo dbInfo = idToDatabase.remove(dbId);
            idToRecycleTime.remove(dbId);

            if (dbInfo != null) {
                dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
                    v.remove(dbId);
                    return v.isEmpty() ? null : v;
                });
            }

            Env.getCurrentEnv().eraseDatabase(dbId, false);
            LOG.info("replay erase db[{}]", dbId);
        } finally {
            writeUnlock();
        }
    }

    private void eraseTable(long currentTimeMs, int keepNum) {
        int eraseNum = 0;
        StopWatch watch = StopWatch.createStarted();
        try {
            // 1. collect expired table IDs under read lock
            List<Long> expiredIds = new ArrayList<>();
            readLock();
            try {
                for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
                    if (isExpire(entry.getKey(), currentTimeMs)) {
                        expiredIds.add(entry.getKey());
                    }
                }
            } finally {
                readUnlock();
            }

            // 2. erase each expired table one at a time
            for (Long tableId : expiredIds) {
                writeLock();
                try {
                    RecycleTableInfo tableInfo = idToTable.remove(tableId);
                    if (tableInfo == null) {
                        continue;
                    }
                    Table table = tableInfo.getTable();
                    if (table.isManagedTable()) {
                        Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId, (OlapTable) table, false);
                    }

                    idToRecycleTime.remove(tableId);

                    dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(), table.getName()),
                            (k, v) -> {
                            v.remove(tableId);
                            return v.isEmpty() ? null : v;
                        });

                    Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
                    LOG.info("erase table[{}]", tableId);
                    eraseNum++;
                } finally {
                    writeUnlock();
                }
            }

            // 3. erase exceed num
            if (keepNum < 0) {
                return;
            }
            List<Map.Entry<Pair<Long, String>, Set<Long>>> groups;
            readLock();
            try {
                groups = new ArrayList<>(dbIdTableNameToIds.entrySet());
            } finally {
                readUnlock();
            }
            for (Map.Entry<Pair<Long, String>, Set<Long>> entry : groups) {
                eraseTableWithSameName(entry.getKey().first, entry.getKey().second, currentTimeMs, keepNum,
                        Lists.newArrayList(entry.getValue()));
            }
        } finally {
            watch.stop();
            LOG.info("eraseTable eraseNum: {} cost: {}ms", eraseNum, watch.getTime());
        }
    }

    private void eraseTableWithSameName(long dbId, String tableName, long currentTimeMs,
            int maxSameNameTrashNum, List<Long> sameNameTableIdList) {
        List<Long> tableIdToErase;
        readLock();
        try {
            tableIdToErase = getIdListToEraseByRecycleTime(sameNameTableIdList, maxSameNameTrashNum);
        } finally {
            readUnlock();
        }
        for (Long tableId : tableIdToErase) {
            writeLock();
            try {
                RecycleTableInfo tableInfo = idToTable.get(tableId);
                if (tableInfo == null || !isExpireMinLatency(tableId, currentTimeMs)) {
                    continue;
                }
                Table table = tableInfo.getTable();
                if (table.isManagedTable()) {
                    Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table, false);
                }

                idToTable.remove(tableId);
                idToRecycleTime.remove(tableId);

                dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, tableName), (k, v) -> {
                    v.remove(tableId);
                    return v.isEmpty() ? null : v;
                });

                Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
                LOG.info("erase table[{}] name: {} from db[{}]", tableId, tableName, dbId);
            } finally {
                writeUnlock();
            }
        }
    }

    public void replayEraseTable(long tableId) {
        writeLock();
        try {
            LOG.info("before replay erase table[{}]", tableId);
            RecycleTableInfo tableInfo = idToTable.remove(tableId);
            idToRecycleTime.remove(tableId);
            if (tableInfo == null) {
                // FIXME(walter): Sometimes `eraseTable` in 'DROP DB ... FORCE' may be executed earlier than
                // finish drop db, especially in the case of drop db with many tables.
                return;
            }

            dbIdTableNameToIds.computeIfPresent(Pair.of(tableInfo.getDbId(), tableInfo.getTable().getName()),
                    (k, v) -> {
                    v.remove(tableId);
                    return v.isEmpty() ? null : v;
                });

            Table table = tableInfo.getTable();
            if (table.isManagedTable()) {
                Env.getCurrentEnv().onEraseOlapTable(tableInfo.dbId, (OlapTable) table, true);
            }
            LOG.info("replay erase table[{}]", tableId);
        } finally {
            writeUnlock();
        }
    }

    private void erasePartition(long currentTimeMs, int keepNum) {
        int eraseNum = 0;
        StopWatch watch = StopWatch.createStarted();
        try {
            // 1. collect expired partition IDs under read lock
            List<Long> expiredIds = new ArrayList<>();
            readLock();
            try {
                for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
                    if (isExpire(entry.getKey(), currentTimeMs)) {
                        expiredIds.add(entry.getKey());
                    }
                }
            } finally {
                readUnlock();
            }

            // 2. erase each expired partition one at a time (microbatch)
            for (Long partitionId : expiredIds) {
                writeLock();
                try {
                    RecyclePartitionInfo partitionInfo = idToPartition.remove(partitionId);
                    if (partitionInfo == null) {
                        continue;
                    }
                    Partition partition = partitionInfo.getPartition();
                    Env.getCurrentEnv().onErasePartition(partition);
                    idToRecycleTime.remove(partitionId);

                    dbTblIdPartitionNameToIds.computeIfPresent(
                            Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()), (pair, partitionMap) -> {
                                partitionMap.computeIfPresent(partition.getName(), (name, idSet) -> {
                                    idSet.remove(partitionId);
                                    return idSet.isEmpty() ? null : idSet;
                                });
                                return partitionMap.isEmpty() ? null : partitionMap;
                            });

                    Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
                    LOG.info("erase partition[{}]. reason: expired", partitionId);
                    eraseNum++;
                } finally {
                    writeUnlock();
                }
            }

            // 3. erase exceed number
            if (keepNum < 0) {
                return;
            }
            // Collect same-name groups under read lock
            List<Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>>> groups;
            readLock();
            try {
                groups = new ArrayList<>(dbTblIdPartitionNameToIds.entrySet());
            } finally {
                readUnlock();
            }
            for (Map.Entry<Pair<Long, Long>, Map<String, Set<Long>>> entry : groups) {
                long dbId = entry.getKey().first;
                long tableId = entry.getKey().second;
                for (Map.Entry<String, Set<Long>> partitionEntry : entry.getValue().entrySet()) {
                    erasePartitionWithSameName(dbId, tableId, partitionEntry.getKey(), currentTimeMs, keepNum,
                            Lists.newArrayList(partitionEntry.getValue()));
                }
            }
        } finally {
            watch.stop();
            LOG.info("erasePartition eraseNum: {} cost: {}ms", eraseNum, watch.getTime());
        }
    }

    private void erasePartitionWithSameName(long dbId, long tableId, String partitionName,
            long currentTimeMs, int maxSameNameTrashNum, List<Long> sameNamePartitionIdList) {
        List<Long> partitionIdToErase;
        readLock();
        try {
            partitionIdToErase = getIdListToEraseByRecycleTime(sameNamePartitionIdList, maxSameNameTrashNum);
        } finally {
            readUnlock();
        }
        for (Long partitionId : partitionIdToErase) {
            writeLock();
            try {
                RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
                if (partitionInfo == null || !isExpireMinLatency(partitionId, currentTimeMs)) {
                    continue;
                }
                Partition partition = partitionInfo.getPartition();

                Env.getCurrentEnv().onErasePartition(partition);
                idToPartition.remove(partitionId);
                idToRecycleTime.remove(partitionId);

                dbTblIdPartitionNameToIds.computeIfPresent(Pair.of(dbId, tableId), (pair, partitionMap) -> {
                    partitionMap.computeIfPresent(partitionName, (name, idSet) -> {
                        idSet.remove(partitionId);
                        return idSet.isEmpty() ? null : idSet;
                    });
                    return partitionMap.isEmpty() ? null : partitionMap;
                });

                Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
                LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", partitionId,
                        partitionName, tableId, dbId);
            } finally {
                writeUnlock();
            }
        }
    }

    public void replayErasePartition(long partitionId) {
        writeLock();
        try {
            RecyclePartitionInfo partitionInfo = idToPartition.remove(partitionId);
            idToRecycleTime.remove(partitionId);

            if (partitionInfo == null) {
                LOG.warn("replayErasePartition: partitionInfo is null for partitionId[{}]", partitionId);
                return;
            }

            dbTblIdPartitionNameToIds.computeIfPresent(
                    Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()), (pair, partitionMap) -> {
                        partitionMap.computeIfPresent(partitionInfo.getPartition().getName(), (name, idSet) -> {
                            idSet.remove(partitionId);
                            return idSet.isEmpty() ? null : idSet;
                        });
                        return partitionMap.isEmpty() ? null : partitionMap;
                    });

            Partition partition = partitionInfo.getPartition();
            Env.getCurrentEnv().onErasePartition(partition);

            LOG.info("replay erase partition[{}]", partitionId);
        } finally {
            writeUnlock();
        }
    }

    private List<Long> getIdListToEraseByRecycleTime(List<Long> ids, int maxTrashNum) {
        List<Long> idToErase = Lists.newArrayList();
        if (ids.size() <= maxTrashNum) {
            return idToErase;
        }
        // order by recycle time desc; use getOrDefault to handle stale IDs
        // that may have been removed between snapshot and read lock acquisition
        ids.sort((x, y) -> Long.compare(
                idToRecycleTime.getOrDefault(y, 0L),
                idToRecycleTime.getOrDefault(x, 0L)));

        for (int i = maxTrashNum; i < ids.size(); i++) {
            idToErase.add(ids.get(i));
        }
        return idToErase;
    }

    public Database recoverDatabase(String dbName, long dbId) throws DdlException {
        writeLock();
        try {
            RecycleDatabaseInfo dbInfo = null;
            // The recycle time of the force dropped tables and databases will be set to zero, use 1 here to
            // skip these databases and tables.
            long recycleTime = 1;
            Iterator<Map.Entry<Long, RecycleDatabaseInfo>> iterator = idToDatabase.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, RecycleDatabaseInfo> entry = iterator.next();
                if (dbName.equals(entry.getValue().getDb().getFullName())) {
                    if (dbId == -1) {
                        if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
                            recycleTime = idToRecycleTime.get(entry.getKey());
                            dbInfo = entry.getValue();
                        }
                    } else if (entry.getKey() == dbId) {
                        dbInfo = entry.getValue();
                        break;
                    }
                }
            }

            if (dbInfo == null) {
                throw new DdlException("Unknown database '" + dbName + "' or database id '" + dbId + "'");
            }

            // 1. recover all tables in this db
            recoverAllTables(dbInfo);

            Database db = dbInfo.getDb();
            // 2. remove db from idToDatabase and idToRecycleTime
            idToDatabase.remove(db.getId());
            idToRecycleTime.remove(db.getId());

            dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
                v.remove(dbId);
                return v.isEmpty() ? null : v;
            });

            return db;
        } finally {
            writeUnlock();
        }
    }

    public Database replayRecoverDatabase(long dbId) {
        writeLock();
        try {
            RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);

            try {
                recoverAllTables(dbInfo);
            } catch (DdlException e) {
                // should not happened
                LOG.error("failed replay recover database: {}", dbId, e);
            }

            idToDatabase.remove(dbId);
            idToRecycleTime.remove(dbId);

            dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
                v.remove(dbId);
                return v.isEmpty() ? null : v;
            });

            return dbInfo.getDb();
        } finally {
            writeUnlock();
        }
    }

    private void recoverAllTables(RecycleDatabaseInfo dbInfo) throws DdlException {
        Database db = dbInfo.getDb();
        Set<String> tableNames = Sets.newHashSet(dbInfo.getTableNames());
        Set<Long> tableIds = Sets.newHashSet(dbInfo.getTableIds());
        long dbId = db.getId();
        Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
        while (iterator.hasNext() && !tableNames.isEmpty()) {
            Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
            RecycleTableInfo tableInfo = entry.getValue();
            if (tableInfo.getDbId() != dbId || !tableNames.contains(tableInfo.getTable().getName())
                    || !tableIds.contains(tableInfo.getTable().getId())) {
                continue;
            }

            Table table = tableInfo.getTable();
            if (table.getType() == TableType.OLAP) {
                db.registerTable(table);
                LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
            } else {
                LOG.info("ignore recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
            }
            iterator.remove();
            idToRecycleTime.remove(table.getId());
            tableNames.remove(table.getName());

            dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, table.getName()), (k, v) -> {
                v.remove(table.getId());
                return v.isEmpty() ? null : v;
            });
        }

        if (!tableNames.isEmpty()) {
            throw new DdlException("Tables[" + tableNames + "] is missing. Can not recover db");
        }
    }

    public boolean recoverTable(Database db, String tableName, long tableId,
                                             String newTableName) throws DdlException {
        writeLock();
        try {
            // make sure to get db lock
            Table table = null;
            // The recycle time of the force dropped tables and databases will be set to zero, use 1 here to
            // skip these databases and tables.
            long recycleTime = 1;
            long dbId = db.getId();
            Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
                RecycleTableInfo tableInfo = entry.getValue();
                if (tableInfo.getDbId() != dbId) {
                    continue;
                }

                if (!tableInfo.getTable().getName().equals(tableName)) {
                    continue;
                }

                if (tableId == -1) {
                    if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
                        recycleTime = idToRecycleTime.get(entry.getKey());
                        table = tableInfo.getTable();
                    }
                } else if (entry.getKey() == tableId) {
                    table = tableInfo.getTable();
                    break;
                }
            }

            if (table == null) {
                throw new DdlException("Unknown table '" + tableName + "' or table id '" + tableId + "' in "
                    + db.getFullName());
            }

            if (table.getType() == TableType.MATERIALIZED_VIEW) {
                throw new DdlException("Can not recover materialized view '" + tableName + "' or table id '"
                        + tableId + "' in " + db.getFullName());
            }

            innerRecoverTable(db, table, tableName, newTableName, null, false);
            LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName());
            return true;
        } finally {
            writeUnlock();
        }
    }

    public void replayRecoverTable(Database db, long tableId, String newTableName) throws DdlException {
        writeLock();
        try {
            // make sure to get db write lock
            Iterator<Map.Entry<Long, RecycleTableInfo>> iterator = idToTable.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, RecycleTableInfo> entry = iterator.next();
                RecycleTableInfo tableInfo = entry.getValue();
                if (tableInfo.getTable().getId() != tableId) {
                    continue;
                }
                Preconditions.checkState(tableInfo.getDbId() == db.getId());
                Table table = tableInfo.getTable();
                String tableName = table.getName();
                if (innerRecoverTable(db, table, tableName, newTableName, iterator, true)) {
                    break;
                }
            }
        } finally {
            writeUnlock();
        }
    }

    private boolean innerRecoverTable(Database db, Table table, String tableName, String newTableName,
                                                Iterator<Map.Entry<Long, RecycleTableInfo>> iterator,
                                                boolean isReplay) throws DdlException {
        table.writeLock();
        try {
            if (!Strings.isNullOrEmpty(newTableName)) {
                if (Env.isStoredTableNamesLowerCase()) {
                    newTableName = newTableName.toLowerCase();
                }
                if (!tableName.equals(newTableName)) {
                    // check if name is already used
                    if (db.getTable(newTableName).isPresent()) {
                        throw new DdlException("Table name[" + newTableName + "] is already used");
                    }

                    if (table.isManagedTable()) {
                        // olap table should also check if any rollup has same name as "newTableName"
                        ((OlapTable) table).checkAndSetName(newTableName, false);
                    } else {
                        table.setName(newTableName);
                    }
                }
            }

            db.registerTable(table);
            if (isReplay) {
                iterator.remove();
            } else {
                idToTable.remove(table.getId());
            }
            idToRecycleTime.remove(table.getId());

            dbIdTableNameToIds.computeIfPresent(Pair.of(db.getId(), tableName), (k, v) -> {
                v.remove(table.getId());
                return v.isEmpty() ? null : v;
            });

            if (isReplay) {
                LOG.info("replay recover table[{}]", table.getId());
            } else {
                // log
                RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(),
                                                    -1L, "", table.getName(), newTableName, "", "");
                Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
            }
            // Only olap table need recover dynamic partition, other table like jdbc odbc view.. do not need it
            if (table.isManagedTable()) {
                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), (OlapTable) table, isReplay);
            }
        } finally {
            table.writeUnlock();
        }
        return true;
    }

    public void recoverPartition(long dbId, OlapTable table, String partitionName,
            long partitionIdToRecover, String newPartitionName) throws DdlException {
        writeLock();
        try {
            if (table.getType() == TableType.MATERIALIZED_VIEW) {
                throw new DdlException("Can not recover partition in materialized view: " + table.getName());
            }

            long recycleTime = -1;
            // make sure to get db write lock
            RecyclePartitionInfo recoverPartitionInfo = null;

            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
                RecyclePartitionInfo partitionInfo = entry.getValue();

                if (partitionInfo.getTableId() != table.getId()) {
                    continue;
                }

                if (!partitionInfo.getPartition().getName().equalsIgnoreCase(partitionName)) {
                    continue;
                }

                if (partitionIdToRecover == -1) {
                    if (recycleTime <= idToRecycleTime.get(entry.getKey())) {
                        recycleTime = idToRecycleTime.get(entry.getKey());
                        recoverPartitionInfo = partitionInfo;
                    }
                } else if (entry.getKey() == partitionIdToRecover) {
                    recoverPartitionInfo = partitionInfo;
                    break;
                }
            }

            if (recoverPartitionInfo == null) {
                throw new DdlException("No partition named '" + partitionName
                        + "' or partition id '" + partitionIdToRecover
                        + "' in table " + table.getName());
            }

            PartitionInfo partitionInfo = table.getPartitionInfo();
            PartitionItem recoverItem = null;
            if (partitionInfo.getType() == PartitionType.RANGE) {
                recoverItem = new RangePartitionItem(recoverPartitionInfo.getRange());
            } else if (partitionInfo.getType() == PartitionType.LIST) {
                recoverItem = recoverPartitionInfo.getListPartitionItem();
            }
            // check if partition item is invalid
            if (partitionInfo.getAnyIntersectItem(recoverItem, false) != null) {
                throw new DdlException("Can not recover partition[" + partitionName + "]. Partition item conflict.");
            }

            // check if schema change
            Partition recoverPartition = recoverPartitionInfo.getPartition();
            Set<Long> tableIndex = table.getIndexIdToMeta().keySet();
            Set<Long> partitionIndex = recoverPartition.getMaterializedIndices(IndexExtState.ALL).stream()
                    .map(i -> i.getId()).collect(Collectors.toSet());
            if (!tableIndex.equals(partitionIndex)) {
                throw new DdlException("table's index not equal with partition's index. table's index=" + tableIndex
                        + ", partition's index=" + partitionIndex);
            }

            // check if partition name exists
            Preconditions.checkState(recoverPartition.getName().equalsIgnoreCase(partitionName));
            if (!Strings.isNullOrEmpty(newPartitionName)) {
                if (table.checkPartitionNameExist(newPartitionName)) {
                    throw new DdlException("Partition name[" + newPartitionName + "] is already used");
                }
                recoverPartition.setName(newPartitionName);
            }

            // recover partition
            table.addPartition(recoverPartition);

            // recover partition info
            long partitionId = recoverPartition.getId();
            partitionInfo.setItem(partitionId, false, recoverItem);
            partitionInfo.setDataProperty(partitionId, recoverPartitionInfo.getDataProperty());
            partitionInfo.setReplicaAllocation(partitionId, recoverPartitionInfo.getReplicaAlloc());
            partitionInfo.setIsInMemory(partitionId, recoverPartitionInfo.isInMemory());
            partitionInfo.setIsMutable(partitionId, recoverPartitionInfo.isMutable());

            // remove from recycle bin
            idToPartition.remove(partitionId);
            idToRecycleTime.remove(partitionId);

            if (!Env.getCurrentEnv().invalidCacheForCloud()) {
                long version = table.getNextVersion();
                table.updateVisibleVersionAndTime(version, System.currentTimeMillis());
            }

            dbTblIdPartitionNameToIds.computeIfPresent(
                    Pair.of(recoverPartitionInfo.getDbId(),
                            recoverPartitionInfo.getTableId()), (pair, partitionMap) -> {
                        partitionMap.computeIfPresent(partitionName, (name, idSet) -> {
                            idSet.remove(recoverPartition.getId());
                            return idSet.isEmpty() ? null : idSet;
                        });
                        return partitionMap.isEmpty() ? null : partitionMap;
                    });

            // log
            RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "",
                                                        table.getName(), "", partitionName, newPartitionName);
            Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
            LOG.info("recover partition[{}]", partitionId);
        } finally {
            writeUnlock();
        }
    }

    // The caller should keep table write lock
    public void replayRecoverPartition(OlapTable table, long partitionId,
                                                    String newPartitionName) throws DdlException {
        writeLock();
        try {
            Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
            Env currentEnv = Env.getCurrentEnv();
            while (iterator.hasNext()) {
                Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
                RecyclePartitionInfo recyclePartitionInfo = entry.getValue();
                if (recyclePartitionInfo.getPartition().getId() != partitionId) {
                    continue;
                }

                Preconditions.checkState(recyclePartitionInfo.getTableId() == table.getId());
                if (!Strings.isNullOrEmpty(newPartitionName)) {
                    if (table.checkPartitionNameExist(newPartitionName)) {
                        throw new DdlException("Partition name[" + newPartitionName + "] is already used");
                    }
                }
                table.addPartition(recyclePartitionInfo.getPartition());
                if (!Strings.isNullOrEmpty(newPartitionName)) {
                    table.renamePartition(recyclePartitionInfo.getPartition().getName(), newPartitionName);
                }
                PartitionInfo partitionInfo = table.getPartitionInfo();
                PartitionItem recoverItem = null;
                if (partitionInfo.getType() == PartitionType.RANGE) {
                    recoverItem = new RangePartitionItem(recyclePartitionInfo.getRange());
                } else if (partitionInfo.getType() == PartitionType.LIST) {
                    recoverItem = recyclePartitionInfo.getListPartitionItem();
                }
                partitionInfo.setItem(partitionId, false, recoverItem);
                partitionInfo.setDataProperty(partitionId, recyclePartitionInfo.getDataProperty());
                partitionInfo.setReplicaAllocation(partitionId, recyclePartitionInfo.getReplicaAlloc());
                partitionInfo.setIsInMemory(partitionId, recyclePartitionInfo.isInMemory());
                partitionInfo.setIsMutable(partitionId, recyclePartitionInfo.isMutable());

                iterator.remove();
                idToRecycleTime.remove(partitionId);

                if (!currentEnv.invalidCacheForCloud()) {
                    long version = table.getNextVersion();
                    table.updateVisibleVersionAndTime(version, System.currentTimeMillis());
                }

                dbTblIdPartitionNameToIds.computeIfPresent(
                        Pair.of(recyclePartitionInfo.getDbId(), table.getId()), (pair, partitionMap) -> {
                            partitionMap.computeIfPresent(
                                    recyclePartitionInfo.getPartition().getName(),
                                    (name, idSet) -> {
                                        idSet.remove(partitionId);
                                        return idSet.isEmpty() ? null : idSet;
                                    });
                            return partitionMap.isEmpty() ? null : partitionMap;
                        });

                LOG.info("replay recover partition[{}]", partitionId);
                break;
            }
        } finally {
            writeUnlock();
        }
    }

    // erase database in catalog recycle bin instantly
    public void eraseDatabaseInstantly(long dbId) throws DdlException {
        // 1. erase db
        RecycleDatabaseInfo dbInfo;
        writeLock();
        try {
            dbInfo = idToDatabase.get(dbId);
            if (dbInfo != null) {
                Env.getCurrentEnv().eraseDatabase(dbId, true);
                idToDatabase.remove(dbId);
                idToRecycleTime.remove(dbId);

                dbNameToIds.computeIfPresent(dbInfo.getDb().getFullName(), (k, v) -> {
                    v.remove(dbId);
                    return v.isEmpty() ? null : v;
                });

                String dbName = dbInfo.getDb().getName();
                LOG.info("erase db[{}]: {}", dbId, dbName);
            }
        } finally {
            writeUnlock();
        }

        // 2. collect tables with same dbId
        List<Long> tableIdToErase = new ArrayList<>();
        readLock();
        try {
            for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
                if (entry.getValue().getDbId() == dbId) {
                    tableIdToErase.add(entry.getKey());
                }
            }
        } finally {
            readUnlock();
        }
        for (Long tableId : tableIdToErase) {
            try {
                eraseTableInstantly(tableId);
            } catch (DdlException e) {
                LOG.info("table[{}] already erased by concurrent operation, skip", tableId);
            }
        }

        // 3. collect partitions with same dbId
        List<Long> partitionIdToErase = new ArrayList<>();
        readLock();
        try {
            for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
                if (entry.getValue().getDbId() == dbId) {
                    partitionIdToErase.add(entry.getKey());
                }
            }
        } finally {
            readUnlock();
        }
        for (Long partitionId : partitionIdToErase) {
            try {
                erasePartitionInstantly(partitionId);
            } catch (DdlException e) {
                LOG.info("partition[{}] already erased by concurrent operation, skip", partitionId);
            }
        }

        // 4. determine if nothing is deleted
        if (dbInfo == null && tableIdToErase.isEmpty() && partitionIdToErase.isEmpty()) {
            throw new DdlException("Unknown database id '" + dbId + "'");
        }
    }

    // erase table in catalog recycle bin instantly
    public void eraseTableInstantly(long tableId) throws DdlException {
        // 1. erase table
        RecycleTableInfo tableInfo;
        writeLock();
        try {
            tableInfo = idToTable.get(tableId);
            if (tableInfo != null) {
                long dbId = tableInfo.getDbId();
                Table table = tableInfo.getTable();
                if (table.getType() == TableType.OLAP || table.getType() == TableType.MATERIALIZED_VIEW) {
                    Env.getCurrentEnv().onEraseOlapTable(dbId, (OlapTable) table, false);
                }

                idToTable.remove(tableId);
                idToRecycleTime.remove(tableId);

                dbIdTableNameToIds.computeIfPresent(Pair.of(dbId, table.getName()), (k, v) -> {
                    v.remove(tableId);
                    return v.isEmpty() ? null : v;
                });

                String tableName = table.getName();
                Env.getCurrentEnv().getEditLog().logEraseTable(tableId);
                LOG.info("erase db[{}]'s table[{}]: {}", dbId, tableId, tableName);
            }
        } finally {
            writeUnlock();
        }

        // 2. collect partitions with same tableId
        List<Long> partitionIdToErase = new ArrayList<>();
        readLock();
        try {
            for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
                if (entry.getValue().getTableId() == tableId) {
                    partitionIdToErase.add(entry.getKey());
                }
            }
        } finally {
            readUnlock();
        }
        for (Long partitionId : partitionIdToErase) {
            try {
                erasePartitionInstantly(partitionId);
            } catch (DdlException e) {
                LOG.info("partition[{}] already erased by concurrent operation, skip", partitionId);
            }
        }

        // 3. determine if nothing is deleted
        if (tableInfo == null && partitionIdToErase.isEmpty()) {
            throw new DdlException("Unknown table id '" + tableId + "'");
        }
    }

    // erase partition in catalog recycle bin instantly
    public void erasePartitionInstantly(long partitionId) throws DdlException {
        writeLock();
        try {
            RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId);
            if (partitionInfo == null) {
                throw new DdlException("No partition id '" + partitionId + "'");
            }

            Partition partition = partitionInfo.getPartition();
            Env.getCurrentEnv().onErasePartition(partition);

            idToPartition.remove(partitionId);
            idToRecycleTime.remove(partitionId);

            dbTblIdPartitionNameToIds.computeIfPresent(
                    Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId()), (pair, partitionMap) -> {
                        partitionMap.computeIfPresent(partition.getName(), (name, idSet) -> {
                            idSet.remove(partitionId);
                            return idSet.isEmpty() ? null : idSet;
                        });
                        return partitionMap.isEmpty() ? null : partitionMap;
                    });

            long tableId = partitionInfo.getTableId();
            String partitionName = partition.getName();
            Env.getCurrentEnv().getEditLog().logErasePartition(partitionId);
            LOG.info("erase table[{}]'s partition[{}]: {}", tableId, partitionId, partitionName);
        } finally {
            writeUnlock();
        }
    }

    // no need to use synchronized.
    // only called when loading image
    public void addTabletToInvertedIndex() {
        // no need to handle idToDatabase. Database is already empty before being put here

        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
        // idToTable
        for (RecycleTableInfo tableInfo : idToTable.values()) {
            Table table = tableInfo.getTable();
            if (!table.isManagedTable()) {
                continue;
            }

            long dbId = tableInfo.getDbId();
            OlapTable olapTable = (OlapTable) table;
            long tableId = olapTable.getId();
            for (Partition partition : olapTable.getAllPartitions()) {
                long partitionId = partition.getId();
                TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                    long indexId = index.getId();
                    int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
                    for (Tablet tablet : index.getTablets()) {
                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
                        long tabletId = tablet.getId();
                        invertedIndex.addTablet(tabletId, tabletMeta);
                        for (Replica replica : tablet.getReplicas()) {
                            invertedIndex.addReplica(tabletId, replica);
                        }
                    }
                } // end for indices
            } // end for partitions
        }

        // idToPartition
        for (RecyclePartitionInfo partitionInfo : idToPartition.values()) {
            long dbId = partitionInfo.getDbId();
            long tableId = partitionInfo.getTableId();
            Partition partition = partitionInfo.getPartition();
            long partitionId = partition.getId();

            // we need to get olap table to get schema hash info
            // first find it in catalog. if not found, it should be in recycle bin
            OlapTable olapTable = null;
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                // just log. db should be in recycle bin
                if (!idToDatabase.containsKey(dbId)) {
                    LOG.error("db[{}] is neither in catalog nor in recycle bin"
                            + " when rebuilding inverted index from recycle bin, partition[{}]",
                            dbId, partitionId);
                    continue;
                }
            } else {
                olapTable = (OlapTable) db.getTableNullable(tableId);
            }

            if (olapTable == null) {
                if (!idToTable.containsKey(tableId)) {
                    LOG.error("table[{}] is neither in catalog nor in recycle bin"
                            + " when rebuilding inverted index from recycle bin, partition[{}]",
                            tableId, partitionId);
                    continue;
                }
                RecycleTableInfo tableInfo = idToTable.get(tableId);
                olapTable = (OlapTable) tableInfo.getTable();
            }
            Preconditions.checkNotNull(olapTable);
            // storage medium should be got from RecyclePartitionInfo, not from olap table. because olap table
            // does not have this partition any more
            TStorageMedium medium = partitionInfo.getDataProperty().getStorageMedium();
            for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                long indexId = index.getId();
                int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
                for (Tablet tablet : index.getTablets()) {
                    TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, medium);
                    long tabletId = tablet.getId();
                    invertedIndex.addTablet(tabletId, tabletMeta);
                    for (Replica replica : tablet.getReplicas()) {
                        invertedIndex.addReplica(tabletId, replica);
                    }
                }
            } // end for indices
        }

    }

    @Override
    protected void runAfterCatalogReady() {
        long currentTimeMs = System.currentTimeMillis();
        // should follow the partition/table/db order
        // in case of partition(table) is still in recycle bin but table(db) is missing
        int keepNum = Config.max_same_name_catalog_trash_num;
        erasePartition(currentTimeMs, keepNum);
        eraseTable(currentTimeMs, keepNum);
        eraseDatabase(currentTimeMs, keepNum);
    }

    public List<List<String>> getInfo() {
        readLock();
        try {
            Map<Long, Pair<Long, Long>> dbToDataSize = new HashMap<>();
            List<List<String>> tableInfos = Lists.newArrayList();
            for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
                List<String> info = Lists.newArrayList();
                info.add("Table");
                RecycleTableInfo tableInfo = entry.getValue();
                Table table = tableInfo.getTable();
                info.add(table.getName());
                info.add(String.valueOf(tableInfo.getDbId()));
                info.add(String.valueOf(entry.getKey()));
                info.add("");
                //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
                info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
                // data size
                long dataSize = table.getDataSize(false);
                info.add(DebugUtil.printByteWithUnit(dataSize));
                // remote data size
                long remoteDataSize = table instanceof OlapTable ? ((OlapTable) table).getRemoteDataSize() : 0L;
                info.add(DebugUtil.printByteWithUnit(remoteDataSize));
                // calculate database data size
                dbToDataSize.compute(tableInfo.getDbId(), (k, v) -> {
                    if (v == null) {
                        return Pair.of(dataSize, remoteDataSize);
                    } else {
                        v.first += dataSize;
                        v.second += remoteDataSize;
                        return v;
                    }
                });

                tableInfos.add(info);
            }
            // sort by Name, DropTime
            tableInfos.sort((x, y) -> {
                int nameRet = x.get(1).compareTo(y.get(1));
                if (nameRet == 0) {
                    return x.get(5).compareTo(y.get(5));
                } else {
                    return nameRet;
                }
            });

            List<List<String>> partitionInfos = Lists.newArrayList();
            for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
                List<String> info = Lists.newArrayList();
                info.add("Partition");
                RecyclePartitionInfo partitionInfo = entry.getValue();
                Partition partition = partitionInfo.getPartition();
                info.add(partition.getName());
                info.add(String.valueOf(partitionInfo.getDbId()));
                info.add(String.valueOf(partitionInfo.getTableId()));
                info.add(String.valueOf(entry.getKey()));
                //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
                info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
                // data size
                long dataSize = partition.getDataSize(false);
                info.add(DebugUtil.printByteWithUnit(dataSize));
                // remote data size
                long remoteDataSize = partition.getRemoteDataSize();
                info.add(DebugUtil.printByteWithUnit(remoteDataSize));
                // calculate database data size
                dbToDataSize.compute(partitionInfo.getDbId(), (k, v) -> {
                    if (v == null) {
                        return Pair.of(dataSize, remoteDataSize);
                    } else {
                        v.first += dataSize;
                        v.second += remoteDataSize;
                        return v;
                    }
                });

                partitionInfos.add(info);
            }
            // sort by Name, DropTime
            partitionInfos.sort((x, y) -> {
                int nameRet = x.get(1).compareTo(y.get(1));
                if (nameRet == 0) {
                    return x.get(5).compareTo(y.get(5));
                } else {
                    return nameRet;
                }
            });

            List<List<String>> dbInfos = Lists.newArrayList();
            for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
                List<String> info = Lists.newArrayList();
                info.add("Database");
                RecycleDatabaseInfo dbInfo = entry.getValue();
                Database db = dbInfo.getDb();
                info.add(db.getFullName());
                info.add(String.valueOf(entry.getKey()));
                info.add("");
                info.add("");
                //info.add(String.valueOf(idToRecycleTime.get(entry.getKey())));
                info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
                // data size
                Pair<Long, Long> dataSizePair = dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L));
                info.add(DebugUtil.printByteWithUnit(dataSizePair.first));
                // remote data size
                info.add(DebugUtil.printByteWithUnit(dataSizePair.second));

                dbInfos.add(info);
            }
            // sort by Name, DropTime
            dbInfos.sort((x, y) -> {
                int nameRet = x.get(1).compareTo(y.get(1));
                if (nameRet == 0) {
                    return x.get(5).compareTo(y.get(5));
                } else {
                    return nameRet;
                }
            });

            return Stream.of(dbInfos, tableInfos, partitionInfos)
                    .flatMap(Collection::stream).collect(Collectors.toList());
        } finally {
            readUnlock();
        }
    }

    public Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
        readLock();
        try {
            Map<Long, Pair<Long, Long>> dbToRecycleSize = new HashMap<>();
            for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
                RecycleTableInfo tableInfo = entry.getValue();
                Table table = tableInfo.getTable();
                if (!(table instanceof OlapTable)) {
                    continue;
                }
                long dataSize = table.getDataSize(false);
                long remoteDataSize = ((OlapTable) table).getRemoteDataSize();
                dbToRecycleSize.compute(tableInfo.getDbId(), (k, v) -> {
                    if (v == null) {
                        return Pair.of(dataSize, remoteDataSize);
                    } else {
                        v.first += dataSize;
                        v.second += remoteDataSize;
                        return v;
                    }
                });
            }

            for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
                RecyclePartitionInfo partitionInfo = entry.getValue();
                Partition partition = partitionInfo.getPartition();
                long dataSize = partition.getDataSize(false);
                long remoteDataSize = partition.getRemoteDataSize();
                dbToRecycleSize.compute(partitionInfo.getDbId(), (k, v) -> {
                    if (v == null) {
                        return Pair.of(dataSize, remoteDataSize);
                    } else {
                        v.first += dataSize;
                        v.second += remoteDataSize;
                        return v;
                    }
                });
            }
            return dbToRecycleSize;
        } finally {
            readUnlock();
        }
    }

    // Need to add read lock, because when calling /dump api to dump image,
    // this class is not protected by any lock, will throw ConcurrentModificationException.
    @Override
    public void write(DataOutput out) throws IOException {
        readLock();
        try {
            out.writeInt(idToDatabase.size());
            for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
                out.writeLong(entry.getKey());
                entry.getValue().write(out);
            }
            out.writeInt(idToTable.size());
            for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
                out.writeLong(entry.getKey());
                entry.getValue().write(out);
            }
            out.writeInt(idToPartition.size());
            for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
                out.writeLong(entry.getKey());
                entry.getValue().write(out);
            }
            out.writeInt(idToRecycleTime.size());
            for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
                out.writeLong(entry.getKey());
                out.writeLong(entry.getValue());
            }
            Text.writeString(out, GsonUtils.GSON.toJson(this));
        } finally {
            readUnlock();
        }
    }

    public void readFieldsWithGson(DataInput in) throws IOException {
        int count = in.readInt();
        for (int i = 0; i < count; i++) {
            long id = in.readLong();
            RecycleDatabaseInfo dbInfo = new RecycleDatabaseInfo();
            dbInfo.readFields(in);
            idToDatabase.put(id, dbInfo);
            dbNameToIds.computeIfAbsent(dbInfo.getDb().getFullName(), k -> ConcurrentHashMap.newKeySet()).add(id);
        }

        count = in.readInt();
        for (int i = 0; i < count; i++) {
            long id = in.readLong();
            RecycleTableInfo tableInfo = new RecycleTableInfo();
            tableInfo = tableInfo.read(in);
            idToTable.put(id, tableInfo);
            dbIdTableNameToIds.computeIfAbsent(Pair.of(tableInfo.getDbId(), tableInfo.getTable().getName()),
                    k -> ConcurrentHashMap.newKeySet()).add(id);
        }

        count = in.readInt();
        for (int i = 0; i < count; i++) {
            long id = in.readLong();
            RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo();
            partitionInfo = partitionInfo.read(in);
            idToPartition.put(id, partitionInfo);
            Pair<Long, Long> dbTblId = Pair.of(partitionInfo.getDbId(), partitionInfo.getTableId());
            dbTblIdPartitionNameToIds.computeIfAbsent(dbTblId, k -> new ConcurrentHashMap<>())
                    .computeIfAbsent(partitionInfo.getPartition().getName(), k -> ConcurrentHashMap.newKeySet())
                    .add(id);
        }

        count = in.readInt();
        for (int i = 0; i < count; i++) {
            long id = in.readLong();
            long time = in.readLong();
            idToRecycleTime.put(id, time);
        }
        GsonUtils.GSON.fromJson(Text.readString(in), CatalogRecycleBin.class);
    }

    public static CatalogRecycleBin read(DataInput in) throws IOException {
        CatalogRecycleBin bin = new CatalogRecycleBin();
        bin.readFieldsWithGson(in);
        return bin;
    }

    public class RecycleDatabaseInfo {
        private Database db;
        private Set<String> tableNames;
        private Set<Long> tableIds;
        // for compatibility in the future
        @SerializedName("u")
        private String unused;

        public RecycleDatabaseInfo() {
            tableNames = Sets.newHashSet();
            tableIds = Sets.newHashSet();
        }

        public RecycleDatabaseInfo(Database db, Set<String> tableNames, Set<Long> tableIds) {
            this.db = db;
            this.tableNames = tableNames;
            this.tableIds = tableIds;
        }

        public Database getDb() {
            return db;
        }

        public Set<String> getTableNames() {
            return tableNames;
        }

        public Set<Long> getTableIds() {
            return tableIds;
        }

        public Set<Long> setTableIds(Set<Long> tableIds) {
            return this.tableIds = tableIds;
        }

        public void write(DataOutput out) throws IOException {
            db.write(out);
            out.writeInt(tableNames.size());
            for (String tableName : tableNames) {
                Text.writeString(out, tableName);
            }
            out.writeInt(tableIds.size());
            for (Long tableId : tableIds) {
                out.writeLong(tableId);
            }
            Text.writeString(out, GsonUtils.GSON.toJson(this));
        }

        public void readFields(DataInput in) throws IOException {
            db = Database.read(in);

            int count  = in.readInt();
            for (int i = 0; i < count; i++) {
                String tableName = Text.readString(in);
                tableNames.add(tableName);
            }
            count = in.readInt();
            for (int i = 0; i < count; i++) {
                long tableId = in.readLong();
                tableIds.add(tableId);
            }
            GsonUtils.GSON.fromJson(Text.readString(in), RecycleDatabaseInfo.class);
        }
    }

    public class RecycleTableInfo {
        @SerializedName("did")
        private long dbId;
        @SerializedName("t")
        private Table table;

        public RecycleTableInfo() {
            // for persist
        }

        public RecycleTableInfo(long dbId, Table table) {
            this.dbId = dbId;
            this.table = table;
        }

        public long getDbId() {
            return dbId;
        }

        public Table getTable() {
            return table;
        }

        public void write(DataOutput out) throws IOException {
            Text.writeString(out, GsonUtils.GSON.toJson(this));
        }

        public RecycleTableInfo read(DataInput in) throws IOException {
            return GsonUtils.GSON.fromJson(Text.readString(in), RecycleTableInfo.class);
        }
    }

    public class RecyclePartitionInfo {
        @SerializedName("did")
        private long dbId;
        @SerializedName("tid")
        private long tableId;
        @SerializedName("p")
        private Partition partition;
        @SerializedName("r")
        private Range<PartitionKey> range;
        @SerializedName("lpi")
        private PartitionItem listPartitionItem;
        @SerializedName("dp")
        private DataProperty dataProperty;
        @SerializedName("ra")
        private ReplicaAllocation replicaAlloc;
        @SerializedName("im")
        private boolean isInMemory;
        @SerializedName("mu")
        private boolean isMutable = true;

        public RecyclePartitionInfo() {
            // for persist
        }

        public RecyclePartitionInfo(long dbId, long tableId, Partition partition,
                                    Range<PartitionKey> range, PartitionItem listPartitionItem,
                                    DataProperty dataProperty, ReplicaAllocation replicaAlloc,
                                    boolean isInMemory, boolean isMutable) {
            this.dbId = dbId;
            this.tableId = tableId;
            this.partition = partition;
            this.range = range;
            this.listPartitionItem = listPartitionItem;
            this.dataProperty = dataProperty;
            this.replicaAlloc = replicaAlloc;
            this.isInMemory = isInMemory;
            this.isMutable = isMutable;
        }

        public long getDbId() {
            return dbId;
        }

        public long getTableId() {
            return tableId;
        }

        public Partition getPartition() {
            return partition;
        }

        public Range<PartitionKey> getRange() {
            return range;
        }

        public PartitionItem getListPartitionItem() {
            return listPartitionItem;
        }

        public DataProperty getDataProperty() {
            return dataProperty;
        }

        public ReplicaAllocation getReplicaAlloc() {
            return replicaAlloc;
        }

        public boolean isInMemory() {
            return isInMemory;
        }

        public boolean isMutable() {
            return isMutable;
        }

        public void write(DataOutput out) throws IOException {
            Text.writeString(out, GsonUtils.GSON.toJson(this));
        }

        public RecyclePartitionInfo read(DataInput in) throws IOException {
            return GsonUtils.GSON.fromJson(Text.readString(in), RecyclePartitionInfo.class);
        }
    }

    // currently only used when loading image. So no synchronized protected.
    public List<Long> getAllDbIds() {
        return Lists.newArrayList(idToDatabase.keySet());
    }

    // only for unit test
    public void clearAll() {
        writeLock();
        try {
            idToDatabase.clear();
            idToTable.clear();
            idToPartition.clear();
            idToRecycleTime.clear();
            dbNameToIds.clear();
            dbIdTableNameToIds.clear();
            dbTblIdPartitionNameToIds.clear();
            LOG.info("Cleared all objects in recycle bin");
        } finally {
            writeUnlock();
        }
    }
}