GlobalTransactionMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.metric.AutoMappedMetric;
import org.apache.doris.metric.GaugeMetricImpl;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * Transaction Manager
 * 1. begin
 * 2. commit
 * 3. abort
 * Attention: all api in txn manager should get db lock or load lock first, then get txn manager's lock,
 * or there will be dead lock
 */
public class GlobalTransactionMgr implements GlobalTransactionMgrIface {
    private static final Logger LOG = LogManager.getLogger(GlobalTransactionMgr.class);

    private Map<Long, DatabaseTransactionMgr> dbIdToDatabaseTransactionMgrs;

    private TransactionIdGenerator idGenerator;
    private TxnStateCallbackFactory callbackFactory;

    private Env env;

    public GlobalTransactionMgr(Env env) {
        this.env = env;
        this.dbIdToDatabaseTransactionMgrs = Maps.newConcurrentMap();
        this.idGenerator = new TransactionIdGenerator();
        this.callbackFactory = new TxnStateCallbackFactory();
    }

    @Override
    public void setEditLog(EditLog editLog) {
        this.idGenerator.setEditLog(editLog);
    }

    @Override
    public TxnStateCallbackFactory getCallbackFactory() {
        return callbackFactory;
    }

    protected DatabaseTransactionMgr getDatabaseTransactionMgr(long dbId) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
        if (dbTransactionMgr == null) {
            throw new AnalysisException("databaseTransactionMgr[" + dbId + "] does not exist");
        }
        return dbTransactionMgr;
    }

    @Override
    public void addDatabaseTransactionMgr(Long dbId) {
        if (dbIdToDatabaseTransactionMgrs.putIfAbsent(dbId,
                new DatabaseTransactionMgr(dbId, env, idGenerator)) == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("add database transaction manager for db {}", dbId);
            }
        }
    }

    @Override
    public void removeDatabaseTransactionMgr(Long dbId) {
        if (dbIdToDatabaseTransactionMgrs.remove(dbId) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("remove database transaction manager for db {}", dbId);
            }
        }
    }

    @Override
    public long beginTransaction(long dbId, List<Long> tableIdList, String label, TxnCoordinator coordinator,
            LoadJobSourceType sourceType, long timeoutSecond)
            throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException,
            QuotaExceedException, MetaNotFoundException {
        return beginTransaction(dbId, tableIdList, label, null, coordinator, sourceType, -1, timeoutSecond);
    }

    /**
     * the app could specify the transaction id
     * <p>
     * requestId is used to judge that whether the request is a internal retry request
     * if label already exist, and requestId are equal, we return the exist tid, and consider this 'begin'
     * as success.
     * requestId == null is for compatibility
     *
     * @param coordinator
     * @throws BeginTransactionException
     * @throws DuplicatedRequestException
     */
    @Override
    public long beginTransaction(long dbId, List<Long> tableIdList, String label, TUniqueId requestId,
            TxnCoordinator coordinator, LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
            throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException, DuplicatedRequestException,
            QuotaExceedException, MetaNotFoundException {
        try {
            if (Config.disable_load_job) {
                throw new AnalysisException("disable_load_job is set to true, all load jobs are prevented");
            }

            switch (sourceType) {
                case BACKEND_STREAMING:
                    checkValidTimeoutSecond(timeoutSecond, Config.max_stream_load_timeout_second,
                            Config.min_load_timeout_second);
                    break;
                default:
                    checkValidTimeoutSecond(timeoutSecond, Config.max_load_timeout_second,
                            Config.min_load_timeout_second);
            }

            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.beginTransaction(tableIdList, label, requestId,
                coordinator, sourceType, listenerId, timeoutSecond);
        } catch (DuplicatedRequestException e) {
            throw e;
        } catch (Exception e) {
            if (MetricRepo.isInit) {
                MetricRepo.COUNTER_TXN_REJECT.increase(1L);
            }
            throw e;
        }
    }

    @Override
    public void preCommitTransaction2PC(Database db, List<Table> tableList, long transactionId,
            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
            TxnCommitAttachment txnCommitAttachment)
            throws UserException {
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            preCommitTransaction2PC(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
    }

    private void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
            List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
            throws UserException {
        if (Config.disable_load_job) {
            throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("try to pre-commit transaction: {}", transactionId);
        }
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
    }

    @Deprecated
    @Override
    public void commitTransactionWithoutLock(long dbId, List<Table> tableList,
            long transactionId, List<TabletCommitInfo> tabletCommitInfos)
            throws UserException {
        commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, null);
    }

    @Override
    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
    }

    /**
     * @param transactionId
     * @param tabletCommitInfos
     * @return
     * @throws UserException
     * @throws TransactionCommitFailedException
     * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
     * @note callers should get all tables' write locks before call this api
     */
    @Override
    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
            List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
            throws UserException {
        if (Config.disable_load_job) {
            throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("try to commit transaction: {}", transactionId);
        }
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
    }

    /**
     * @note callers should get all tables' write locks before call this api
     */
    public void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId,
            List<SubTransactionState> subTransactionStates, long timeout) throws UserException {
        if (Config.disable_load_job) {
            throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("try to commit transaction: {}", transactionId);
        }
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        dbTransactionMgr.commitTransaction(transactionId, tableList, subTransactionStates);
    }

    @Override
    public void commitTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, TxnCommitAttachment txnCommitAttachment)
            throws UserException {
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
    }

    private void commitTransaction2PC(long dbId, long transactionId)
            throws UserException {
        if (Config.disable_load_job) {
            throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
        }

        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        dbTransactionMgr.commitTransaction(null, transactionId, null, null, true);
    }

    public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
            throws UserException {
        return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
    }

    public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
            List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
            TxnCommitAttachment txnCommitAttachment)
            throws UserException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            commitTransactionWithoutLock(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
        stopWatch.stop();
        long publishTimeoutMillis = timeoutMillis - stopWatch.getTime();
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(db.getId());
        if (publishTimeoutMillis < 0) {
            // here commit transaction successfully cost too much time
            // to cause that publishTimeoutMillis is less than zero,
            // so we just return false to indicate publish timeout
            return false;
        }
        return dbTransactionMgr.waitForTransactionFinished(db, transactionId, publishTimeoutMillis);
    }

    public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
            List<SubTransactionState> subTransactionStates, long timeoutMillis) throws UserException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        List<Long> tableIdList = subTransactionStates.stream().map(s -> s.getTable().getId()).distinct()
                .collect(Collectors.toList());
        List<? extends TableIf> tableIfList = db.getTablesOnIdOrderOrThrowException(tableIdList);
        List<Table> tableList = tableIfList.stream().map(t -> (Table) t).collect(Collectors.toList());
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            commitTransactionWithoutLock(db.getId(), tableList, transactionId, subTransactionStates, timeoutMillis);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
        stopWatch.stop();
        long publishTimeoutMillis = timeoutMillis - stopWatch.getTime();
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(db.getId());
        if (publishTimeoutMillis < 0) {
            // here commit transaction successfully cost too much time
            // to cause that publishTimeoutMillis is less than zero,
            // so we just return false to indicate publish timeout
            return false;
        }
        return dbTransactionMgr.waitForTransactionFinished(db, transactionId, publishTimeoutMillis);
    }

    public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
            throws UserException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            commitTransaction2PC(db.getId(), transactionId);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
        stopWatch.stop();
        LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms."
                + " data will be visible later.", transactionId, stopWatch.getTime());
    }

    @Override
    public void abortTransaction(Long dbId, Long transactionId, String reason) throws UserException {
        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
        TransactionState transactionState = getDatabaseTransactionMgr(dbId).getTransactionState(transactionId);
        if (transactionState == null) {
            LOG.info("try to cancel one txn which has no txn state. txn id: {}.", transactionId);
            return;
        }
        List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
        abortTransaction(dbId, transactionId, reason, null, tableList);
    }

    @Override
    public void abortTransaction(Long dbId, Long txnId, String reason,
            TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            dbTransactionMgr.abortTransaction(txnId, reason, txnCommitAttachment);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
    }

    // for http cancel stream load api
    @Override
    public void abortTransaction(Long dbId, String label, String reason) throws UserException {
        Long txnId = getTransactionId(dbId, label);
        if (txnId == null) {
            throw new AnalysisException("txn with label " + label + " does not exist");
        }
        abortTransaction(dbId, txnId, reason);
    }

    @Override
    public void abortTransaction2PC(Long dbId, long transactionId, List<Table> tableList) throws UserException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 5000, TimeUnit.MILLISECONDS)) {
            throw new UserException("get tableList write lock timeout, tableList=("
                    + StringUtils.join(tableList, ",") + ")");
        }
        try {
            dbTransactionMgr.abortTransaction2PC(transactionId);
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
    }

    /*
     * get all txns which is ready to publish
     * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version.
     */
    public List<TransactionState> getReadyToPublishTransactions() {
        List<TransactionState> transactionStateList = Lists.newArrayList();
        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
            transactionStateList.addAll(dbTransactionMgr.getCommittedTxnList());
        }
        return transactionStateList;
    }

    public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) {
        DatabaseTransactionMgr dbTransactionMgr = dbIdToDatabaseTransactionMgrs.get(dbId);
        if (tableId == null && partitionId == null) {
            return !dbTransactionMgr.getCommittedTxnList().isEmpty();
        }

        List<TransactionState> committedTxnList = dbTransactionMgr.getCommittedTxnList();
        for (TransactionState transactionState : committedTxnList) {
            if (transactionState.getTableIdList().contains(tableId)) {
                if (partitionId == null) {
                    return true;
                }
                TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tableId);
                if (tableCommitInfo == null) {
                    // FIXME: this is a bug, should not happen
                    // If table id is in transaction state's table list, and it is COMMITTED,
                    // table commit info should not be null.
                    // return true to avoid following process.
                    LOG.warn("unexpected error. tableCommitInfo is null. dbId: {} tableId: {}, partitionId: {},"
                                    + " transactionState: {}",
                            dbId, tableId, partitionId, transactionState);
                    return true;
                }
                if (tableCommitInfo.getPartitionCommitInfo(partitionId) != null) {
                    return true;
                }
            }
        }
        return false;
    }

    public static boolean checkFailedTxnsByCoordinator(TransactionState txn) {
        TxnCoordinator coordinator = txn.getCoordinator();
        boolean offline = true;
        if (coordinator.sourceType == TransactionState.TxnSourceType.FE) {
            List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
            for (Frontend fe : frontends) {
                if (fe.getHost().equals(coordinator.ip)) {
                    offline = false;
                    if (fe.getLastStartupTime() > coordinator.startTime) {
                        return true;
                    }
                }
            }
        } else if (coordinator.sourceType == TransactionState.TxnSourceType.BE) {
            Backend be = Env.getCurrentSystemInfo().getBackend(coordinator.id);
            if (be != null) {
                offline = false;
                if (be.getHost().equals(coordinator.ip) && (be.getLastStartTime() > coordinator.startTime
                        || (!be.isAlive() && System.currentTimeMillis() - be.getLastUpdateMs()
                                    >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L))) {
                    return true;
                }
            }
        }
        return offline;
    }

    public static List<TransactionState> checkFailedTxns(List<TransactionState> conflictTxns) {
        List<TransactionState> failedTxns = new ArrayList<>();
        for (TransactionState txn : conflictTxns) {
            if (checkFailedTxnsByCoordinator(txn)) {
                failedTxns.add(txn);
            }
        }
        return failedTxns;
    }

    public List<TransactionState> getUnFinishedPreviousLoad(long endTransactionId,
            long dbId, List<Long> tableIdList) throws UserException {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getUnFinishedPreviousLoad(endTransactionId, tableIdList);
        } catch (AnalysisException e) {
            // NOTICE: At present, this situation will only happen when the database no longer exists.
            // In fact, getDatabaseTransactionMgr() should explicitly throw a MetaNotFoundException,
            // but changing the type of exception will cause a large number of code changes,
            // which is not worth the loss.
            // So here just simply think that AnalysisException only means that db does not exist.
            LOG.warn("Check whether all previous transactions in db [" + dbId + "] finished failed", e);
            throw new UserException(e.getMessage());
        }
    }

    /**
     * if the table is deleted between commit and publish version, then should ignore the partition
     *
     * @param dbId
     * @param transactionId
     * @return
     */
    public void finishTransaction(long dbId, long transactionId, Map<Long, Long> partitionVisibleVersions,
            Map<Long, Set<Long>> backendPartitions) throws UserException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        dbTransactionMgr.finishTransaction(transactionId, partitionVisibleVersions, backendPartitions);
    }

    /**
     * Check whether a load job already exists before
     * checking all `TransactionId` related with this load job have finished.
     * finished
     *
     * @throws AnalysisException is database does not exist anymore
     */
    @Override
    public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List<Long> tableIdList)
            throws AnalysisException {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.isPreviousTransactionsFinished(endTransactionId, tableIdList);
        } catch (AnalysisException e) {
            // NOTICE: At present, this situation will only happen when the database no longer exists.
            // In fact, getDatabaseTransactionMgr() should explicitly throw a MetaNotFoundException,
            // but changing the type of exception will cause a large number of code changes,
            // which is not worth the loss.
            // So here just simply think that AnalysisException only means that db does not exist.
            LOG.warn("Check whether all previous transactions in db [" + dbId + "] finished failed", e);
            throw e;
        }
    }

    /**
     * Check whether a load job for a partition already exists before
     * checking all `TransactionId` related with this load job have finished.
     * finished
     *
     * @throws AnalysisException is database does not exist anymore
     */
    @Override
    public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, long tableId,
            long partitionId) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.isPreviousTransactionsFinished(endTransactionId, tableId, partitionId);
    }

    /**
     * The txn cleaner will run at a fixed interval and try to delete expired and timeout txns:
     * expired: txn is in VISIBLE or ABORTED, and is expired.
     * timeout: txn is in PREPARE, but timeout
     */
    @Override
    public void removeExpiredAndTimeoutTxns() {
        long currentMillis = System.currentTimeMillis();
        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
            dbTransactionMgr.removeExpiredAndTimeoutTxns(currentMillis);
        }
    }

    @Override
    public void cleanLabel(Long dbId, String label, boolean isReplay) throws Exception {
        getDatabaseTransactionMgr(dbId).cleanLabel(label, isReplay);
    }

    @Override
    public void updateMultiTableRunningTransactionTableIds(Long dbId, Long transactionId, List<Long> tableIds)
            throws UserException {
        getDatabaseTransactionMgr(dbId).updateMultiTableRunningTransactionTableIds(transactionId, tableIds);
    }

    @Override
    public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request)
            throws AnalysisException, TimeoutException {
        long dbId = request.getDbId();
        int commitTimeoutSec = Config.commit_timeout_second;
        TransactionStatus txnStatus = null;
        for (int i = 0; i < commitTimeoutSec; ++i) {
            Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbId);
            TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult();
            statusResult.status = new TStatus();
            if (request.isSetTxnId()) {
                long txnId = request.getTxnId();
                TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, txnId);
                if (txnState == null) {
                    throw new AnalysisException("txn does not exist: " + txnId);
                }
                txnStatus = txnState.getTransactionStatus();
                if (!txnState.getReason().trim().isEmpty()) {
                    statusResult.status.setErrorMsgsIsSet(true);
                    statusResult.status.addToErrorMsgs(txnState.getReason());
                }
            } else {
                txnStatus = getLabelState(dbId, request.getLabel());
            }
            if (txnStatus == TransactionStatus.UNKNOWN || txnStatus.isFinalStatus()) {
                statusResult.setTxnStatusId(txnStatus.value());
                return statusResult;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                LOG.info("commit sleep exception.", e);
            }
        }
        if (txnStatus == TransactionStatus.COMMITTED) {
            TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult();
            statusResult.status = new TStatus();
            statusResult.setTxnStatusId(txnStatus.value());
            return statusResult;
        }
        throw new TimeoutException("Operation is timeout, txn status is " + txnStatus);
    }

    @Override
    public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes);
    }

    @Override
    public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) {
        List<Pair<Long, Long>> transactionIdByCoordinateBe
                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, Integer.MAX_VALUE);
        for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
            try {
                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
                TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second);
                long coordStartTime = transactionState.getCoordinator().startTime;
                if (coordStartTime > 0 && coordStartTime < beStartTime) {
                    // does not hold table write lock
                    dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null);
                }
            } catch (UserException e) {
                LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage());
            }
        }
    }

    /**
     * If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout
     * So when FE identify the Coordinate BE is down, FE should cancel it initiative
     */
    @Override
    public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) {
        List<Pair<Long, Long>> transactionIdByCoordinateBe
                = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit);
        for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
            try {
                // does not hold table write lock
                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
                dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null);
            } catch (UserException e) {
                LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage());
            }
        }
    }

    @Override
    public TransactionStatus getLabelState(long dbId, String label) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getLabelState(label);
        } catch (AnalysisException e) {
            LOG.warn("Get transaction status by label " + label + " failed", e);
            return TransactionStatus.UNKNOWN;
        }

    }

    @Override
    public Long getTransactionId(Long dbId, String label) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getTransactionIdByLabel(label);
        } catch (AnalysisException e) {
            LOG.warn("Get transaction id by label " + label + " failed", e);
            return null;
        }
    }

    public TransactionState getTransactionState(long dbId, long transactionId) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getTransactionState(transactionId);
        } catch (AnalysisException e) {
            LOG.warn("Get transaction {} in db {} failed. msg: {}", transactionId, dbId, e.getMessage());
            return null;
        }
    }

    @Override
    public List<TransactionState> getPreCommittedTxnList(Long dbId) throws AnalysisException {
        return getDatabaseTransactionMgr(dbId).getPreCommittedTxnList();
    }

    @Override
    public Long getTransactionIdByLabel(Long dbId, String label, List<TransactionStatus> statusList)
            throws UserException {
        return getDatabaseTransactionMgr(dbId).getTransactionIdByLabel(label, statusList);
    }

    /**
     * It is a non thread safe method, only invoked by checkpoint thread
     * without any lock or image dump thread with db lock
     */
    @Override
    public int getTransactionNum() {
        int txnNum = 0;
        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
            txnNum += dbTransactionMgr.getTransactionNum();
        }
        return txnNum;
    }

    @Override
    public List<List<String>> getDbInfo() {
        List<List<String>> infos = new ArrayList<>();
        long totalRunningNum = 0;
        List<Long> dbIds = Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
        Collections.sort(dbIds);
        for (long dbId : dbIds) {
            List<String> info = new ArrayList<>();
            info.add(String.valueOf(dbId));
            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }
            info.add(db.getFullName());
            long runningNum = 0;
            try {
                DatabaseTransactionMgr dbMgr = getDatabaseTransactionMgr(dbId);
                runningNum = dbMgr.getRunningTxnNums();
                totalRunningNum += runningNum;
            } catch (AnalysisException e) {
                LOG.warn("get database running transaction num failed", e);
            }
            info.add(String.valueOf(runningNum));
            infos.add(info);
        }
        List<String> info = Arrays.asList("0", "Total", String.valueOf(totalRunningNum));
        infos.add(info);
        return infos;
    }

    @Override
    public List<List<String>> getDbTransStateInfo(Long dbId) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getDbTransStateInfo();
        } catch (AnalysisException e) {
            LOG.warn("Get db [" + dbId + "] transactions info failed", e);
            return Lists.newArrayList();
        }
    }

    @Override
    public List<List<String>> getDbTransInfo(Long dbId, boolean running, int limit) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.getTxnStateInfoList(running, limit);
    }

    public Map<Long, List<Long>> getDbRunningTransInfo(long dbId) throws AnalysisException {
        return Optional.ofNullable(dbIdToDatabaseTransactionMgrs.get(dbId))
                .map(DatabaseTransactionMgr::getDbRunningTransInfo).orElse(Maps.newHashMap());
    }

    @Override
    public List<List<String>> getDbTransInfoByStatus(Long dbId, TransactionStatus status) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.getTxnStateInfoList(status);
    }

    @Override
    public List<List<String>> getDbTransInfoByLabelMatch(long dbId, String label) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.getTxnStateInfoList(label);
    }

    @Override
    public long getTxnNumByStatus(TransactionStatus status) {
        long counter = 0;
        for (DatabaseTransactionMgr dbMgr : dbIdToDatabaseTransactionMgrs.values()) {
            counter += dbMgr.getTxnNumByStatus(status);
        }
        return counter;
    }

    // get show info of a specified txnId
    @Override
    public List<List<String>> getSingleTranInfo(long dbId, long txnId) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.getSingleTranInfo(dbId, txnId);
    }

    public List<List<Comparable>> getTableTransInfo(long dbId, long txnId) throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.getTableTransInfo(txnId);
    }

    public List<List<Comparable>> getPartitionTransInfo(long dbId, long tid, long tableId)
            throws AnalysisException {
        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
        return dbTransactionMgr.getPartitionTransInfo(tid, tableId);
    }

    @Override
    public TransactionIdGenerator getTransactionIDGenerator() {
        return this.idGenerator;
    }

    @Deprecated
    private TransactionState getTransactionStateByCallbackIdAndStatus(
            long dbId, long callbackId, Set<TransactionStatus> status) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getTransactionStateByCallbackIdAndStatus(callbackId, status);
        } catch (AnalysisException e) {
            LOG.warn("Get transaction by callbackId and status failed", e);
            return null;
        }
    }

    @Deprecated
    private TransactionState getTransactionStateByCallbackId(long dbId, long callbackId) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            return dbTransactionMgr.getTransactionStateByCallbackId(callbackId);
        } catch (AnalysisException e) {
            LOG.warn("Get transaction by callbackId failed", e);
            return null;
        }
    }

    protected List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long coordinateBeId,
            String coordinateHost, int limit) {
        ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
        for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
            txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe(
                        coordinateBeId, coordinateHost, limit));
            if (txnInfos.size() > limit) {
                break;
            }
        }
        return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos;
    }

    @Override
    public long getAllRunningTxnNum() {
        return updateTxnMetric(databaseTransactionMgr -> (long) databaseTransactionMgr.getRunningTxnNumsWithLock(),
                MetricRepo.DB_GAUGE_TXN_NUM);
    }

    @Override
    public long getAllPublishTxnNum() {
        return updateTxnMetric(
                databaseTransactionMgr -> (long) databaseTransactionMgr.getCommittedTxnList().size(),
                MetricRepo.DB_GAUGE_PUBLISH_TXN_NUM);
    }

    private long updateTxnMetric(Function<DatabaseTransactionMgr, Long> metricSupplier,
            AutoMappedMetric<GaugeMetricImpl<Long>> metric) {
        long total = 0;
        for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
            long num = metricSupplier.apply(mgr).longValue();
            total += num;
            Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
            if (db != null) {
                metric.getOrAdd(db.getFullName()).setValue(num);
            }
        }
        return total;
    }

    @Override
    public int getRunningTxnNums(Long dbId) throws AnalysisException {
        return Optional.ofNullable(dbIdToDatabaseTransactionMgrs.get(dbId))
                .map(DatabaseTransactionMgr::getRunningTxnNums).orElse(0);
    }

    @Override
    public Long getNextTransactionId() {
        return this.idGenerator.getNextTransactionId();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        int numTransactions = getTransactionNum();
        out.writeInt(numTransactions);
        for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
            dbTransactionMgr.unprotectWriteAllTransactionStates(out);
        }
        idGenerator.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        int numTransactions = in.readInt();
        for (int i = 0; i < numTransactions; ++i) {
            TransactionState transactionState = TransactionState.read(in);
            try {
                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId());
                dbTransactionMgr.unprotectUpsertTransactionState(transactionState, true);
            } catch (AnalysisException e) {
                LOG.warn("failed to get db transaction manager for txn: {}", transactionState);
                throw new IOException("Read transaction states failed", e);
            }
        }
        idGenerator.readFields(in);
    }

    // for replay idToTransactionState
    // check point also run transaction cleaner, the cleaner maybe concurrently modify id to
    @Override
    public void replayUpsertTransactionState(TransactionState transactionState) throws MetaNotFoundException {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId());
            dbTransactionMgr.replayUpsertTransactionState(transactionState);
        } catch (AnalysisException e) {
            throw new MetaNotFoundException(e);
        }
    }

    @Override
    @Deprecated
    // Use replayBatchDeleteTransactions instead
    public void replayDeleteTransactionState(TransactionState transactionState) throws MetaNotFoundException {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(transactionState.getDbId());
            dbTransactionMgr.replayDeleteTransaction(transactionState);
        } catch (AnalysisException e) {
            throw new MetaNotFoundException(e);
        }
    }

    @Override
    public void replayBatchRemoveTransactions(BatchRemoveTransactionsOperation operation) {
        Map<Long, List<Long>> dbTxnIds = operation.getDbTxnIds();
        for (Long dbId : dbTxnIds.keySet()) {
            try {
                DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
                dbTransactionMgr.replayBatchRemoveTransaction(dbTxnIds.get(dbId));
            } catch (AnalysisException e) {
                LOG.warn("replay batch remove transactions failed. db " + dbId, e);
            }
        }
    }

    @Override
    public void replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(operation.getDbId());
            dbTransactionMgr.replayBatchRemoveTransaction(operation);
        } catch (AnalysisException e) {
            LOG.warn("replay batch remove transactions failed. db " + operation.getDbId(), e);
        }
    }

    @Override
    public void addSubTransaction(long dbId, long transactionId, long subTransactionId) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            dbTransactionMgr.addSubTransaction(transactionId, subTransactionId);
        } catch (AnalysisException e) {
            LOG.warn("add sub transaction failed. db " + dbId, e);
        }
    }

    @Override
    public void removeSubTransaction(long dbId, long subTransactionId) {
        try {
            DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
            dbTransactionMgr.removeSubTransaction(subTransactionId);
        } catch (AnalysisException e) {
            LOG.warn("remove sub transaction failed. db " + dbId, e);
        }
    }

    @Override
    public int getQueueLength() {
        return 0;
    }
}