TransactionEntry.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TSubTxnInfo;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TTxnLoadInfo;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.SubTransactionState.SubTransactionType;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class TransactionEntry {

    private static final Logger LOG = LogManager.getLogger(TransactionEntry.class);

    private String label = "";
    private DatabaseIf database;
    private long dbId = -1;

    // for insert into values for one table
    private Table table;
    private Backend backend;
    private TTxnParams txnConf;
    private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
    private long rowsInTransaction = 0;
    private Types.PUniqueId pLoadId;
    private boolean isFirstTxnInsert = false;
    private volatile int txnSchemaVersion = -1;

    // for insert into select for multi tables
    private boolean isTransactionBegan = false;
    private long transactionId = -1;
    private TransactionState transactionState;
    private long timeoutTimestamp = -1;
    private List<SubTransactionState> subTransactionStates = new ArrayList<>();
    // Used for cloud mode, including all successful or failed sub transactions except the first one
    private long allSubTxnNum = 0;

    public TransactionEntry() {
    }

    public TransactionEntry(TTxnParams txnConf, Database db, Table table) {
        this.txnConf = txnConf;
        this.database = db;
        this.dbId = this.database.getId();
        this.table = table;
    }

    public String getLabel() {
        return label;
    }

    public void setLabel(String label) {
        this.label = label;
    }

    public DatabaseIf getDb() {
        return database;
    }

    public void setDb(Database db) {
        this.database = db;
        this.dbId = this.database.getId();
    }

    public Table getTable() {
        return table;
    }

    public void setTable(Table table) {
        this.table = table;
    }

    public Backend getBackend() {
        return backend;
    }

    public void setBackend(Backend backend) {
        this.backend = backend;
    }

    public TTxnParams getTxnConf() {
        return txnConf;
    }

    public void setTxnConf(TTxnParams txnConf) {
        this.txnConf = txnConf;
    }

    public boolean isTxnModel() {
        return txnConf != null && txnConf.isNeedTxn();
    }

    public boolean isInsertValuesTxnIniting() {
        return isTxnModel() && txnConf.getTxnId() == -1;
    }

    private boolean isInsertValuesTxnBegan() {
        return isTxnModel() && txnConf.getTxnId() != -1;
    }

    public List<InternalService.PDataRow> getDataToSend() {
        return dataToSend;
    }

    public void setDataToSend(List<InternalService.PDataRow> dataToSend) {
        this.dataToSend = dataToSend;
    }

    public void clearDataToSend() {
        dataToSend.clear();
    }

    public long getRowsInTransaction() {
        return rowsInTransaction;
    }

    public void setRowsInTransaction(long rowsInTransaction) {
        this.rowsInTransaction = rowsInTransaction;
    }

    public Types.PUniqueId getpLoadId() {
        return pLoadId;
    }

    public void setpLoadId(Types.PUniqueId pLoadId) {
        this.pLoadId = pLoadId;
    }

    public boolean isFirstTxnInsert() {
        return isFirstTxnInsert;
    }

    public void setFirstTxnInsert(boolean firstTxnInsert) {
        isFirstTxnInsert = firstTxnInsert;
    }

    public int getTxnSchemaVersion() {
        return txnSchemaVersion;
    }

    public void setTxnSchemaVersion(int txnSchemaVersion) {
        this.txnSchemaVersion = txnSchemaVersion;
    }

    // Used for insert into select, return the sub_txn_id for this insert
    public long beginTransaction(TableIf table, SubTransactionType subTransactionType) throws Exception {
        if (isInsertValuesTxnBegan()) {
            // FIXME: support mix usage of `insert into values` and `insert into select`
            throw new AnalysisException(
                    "Transaction insert can not insert into values and insert into select at the same time");
        }
        DatabaseIf database = table.getDatabase();
        if (!isTransactionBegan) {
            long timeoutSecond = ConnectContext.get().getExecTimeoutS();
            this.timeoutTimestamp = System.currentTimeMillis() + timeoutSecond * 1000;
            if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
                this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                        database.getId(), Lists.newArrayList(table.getId()), label,
                        new TxnCoordinator(TxnSourceType.FE, 0,
                                FrontendOptions.getLocalHostAddress(),
                                ExecuteEnv.getInstance().getStartupTime()),
                        LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
            } else {
                String token = Env.getCurrentEnv().getTokenManager().acquireToken();
                MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ConnectContext.get());
                TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
                request.setDb(database.getFullName()).setTbl(table.getName()).setToken(token)
                        .setLabel(label).setUser("").setUserIp("").setPasswd("").setTimeout(timeoutSecond);
                TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request);
                this.transactionId = result.getTxnId();
            }
            this.isTransactionBegan = true;
            this.database = database;
            this.dbId = this.database.getId();
            this.transactionState = Env.getCurrentGlobalTransactionMgr()
                    .getTransactionState(database.getId(), transactionId);
            return this.transactionId;
        } else {
            if (this.database.getId() != database.getId()) {
                throw new AnalysisException(
                        "Transaction insert must be in the same database, expect db_id=" + this.database.getId());
            }
            // for delete type, make sure there is no insert for the same table
            if (subTransactionType == SubTransactionType.DELETE && subTransactionStates.stream()
                    .anyMatch(s -> s.getTable().getId() == table.getId()
                            && s.getSubTransactionType() == SubTransactionType.INSERT)) {
                throw new AnalysisException("Can not delete because there is a insert operation for the same table");
            }
            long subTxnId;
            if (Config.isCloudMode()) {
                TUniqueId queryId = ConnectContext.get().queryId();
                String label = String.format("tl_%x_%x", queryId.hi, queryId.lo);
                Set<Long> tableIds = getTableIds();
                tableIds.add(table.getId());
                Pair<Long, TransactionState> pair
                        = ((CloudGlobalTransactionMgr) Env.getCurrentGlobalTransactionMgr()).beginSubTxn(
                        transactionId, table.getDatabase().getId(), tableIds, label, allSubTxnNum);
                this.transactionState = pair.second;
                subTxnId = pair.first;
            } else {
                subTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
                this.transactionState.addTableId(table.getId());
            }
            Env.getCurrentGlobalTransactionMgr().addSubTransaction(database.getId(), transactionId, subTxnId);
            allSubTxnNum++;
            return subTxnId;
        }
    }

    public TransactionStatus commitTransaction() throws Exception {
        if (isTransactionBegan) {
            try {
                // cloud mode does not commit on observer fe because CloudGlobalTransactionMgr#afterCommitTxnResp
                // will produce event
                if (Env.getCurrentEnv().isMaster()) {
                    beforeFinishTransaction();
                    // the report_tablet_interval_seconds is default 60
                    long commitTimeout = Math.min(Config.commit_timeout_second * 1000L,
                            Math.max(timeoutTimestamp - System.currentTimeMillis(), 0));
                    if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database, transactionId,
                            subTransactionStates, commitTimeout)) {
                        return TransactionStatus.VISIBLE;
                    } else {
                        return TransactionStatus.COMMITTED;
                    }
                } else {
                    OriginStatement originStmt = new OriginStatement("commit", 0);
                    MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, ConnectContext.get(),
                            RedirectStatus.NO_FORWARD, false);
                    LOG.info("forward commit to Master for txn_id={}", this.transactionId);
                    masterOpExecutor.execute();
                    return waitingTxnVisible(this.dbId, this.transactionId);
                }
            } catch (UserException e) {
                LOG.error("Failed to commit transaction", e);
                try {
                    abortTransaction(e.getMessage());
                } catch (Exception e1) {
                    LOG.error("Failed to abort transaction", e1);
                }
                throw e;
            }
        } else if (isInsertValuesTxnBegan()) {
            InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(this);
            if (dataToSend.size() > 0) {
                // send rest data
                executor.sendData();
            }
            // commit txn
            executor.commitTransaction();
            // wait txn visible
            return waitingTxnVisible(txnConf.getDbId(), txnConf.getTxnId());
        } else {
            LOG.info("No transaction to commit");
            return null;
        }
    }

    private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
        TWaitingTxnStatusResult statusResult = null;
        if (Env.getCurrentEnv().isMaster()) {
            statusResult = Env.getCurrentGlobalTransactionMgr().getWaitingTxnStatus(request);
        } else {
            MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ConnectContext.get());
            statusResult = masterTxnExecutor.getWaitingTxnStatus(request);
        }
        return statusResult;
    }

    public long abortTransaction() throws Exception {
        return abortTransaction("user rollback");
    }

    private long abortTransaction(String reason) throws Exception {
        if (isTransactionBegan) {
            if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
                beforeFinishTransaction();
                Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), transactionId, reason);
                return transactionId;
            } else {
                OriginStatement originStmt = new OriginStatement("rollback", 0);
                MasterOpExecutor masterOpExecutor = new MasterOpExecutor(originStmt, ConnectContext.get(),
                        RedirectStatus.NO_FORWARD, false);
                LOG.info("forward rollback to Master for txn_id={}", transactionId);
                masterOpExecutor.execute();
                return transactionId;
            }
        } else if (isInsertValuesTxnBegan()) {
            InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(this);
            executor.abortTransaction();
            return txnConf.getTxnId();
        } else {
            LOG.info("No transaction to abort");
            return -1;
        }
    }

    private void beforeFinishTransaction() throws DdlException {
        if (isTransactionBegan) {
            if (Config.isCloudMode()) {
                // null if this is observer
                if (transactionState == null) {
                    database = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
                    transactionState = Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, transactionId);
                }
            } else {
                List<Long> tableIds = transactionState.getTableIdList().stream().distinct()
                        .collect(Collectors.toList());
                transactionState.setTableIdList(tableIds);
            }
            LOG.info("subTransactionStates={}", subTransactionStates);
            transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
                    .collect(Collectors.toList()));
        }
    }

    public long getTransactionId() {
        if (isTransactionBegan) {
            return transactionId;
        } else if (isInsertValuesTxnBegan()) {
            return txnConf.getTxnId();
        } else {
            return -1;
        }
    }

    public void abortSubTransaction(long subTransactionId, Table table) {
        if (isTransactionBegan) {
            if (Config.isCloudMode()) {
                try {
                    Set<Long> tableIds = getTableIds();
                    this.transactionState
                            = ((CloudGlobalTransactionMgr) Env.getCurrentGlobalTransactionMgr()).abortSubTxn(
                            transactionId, subTransactionId, table.getDatabase().getId(), tableIds, allSubTxnNum);
                } catch (UserException e) {
                    LOG.error("Failed to remove table_id={} from txn_id={}", table.getId(), transactionId, e);
                }
            } else {
                this.transactionState.removeTableId(table.getId());
            }
            Env.getCurrentGlobalTransactionMgr().removeSubTransaction(table.getDatabase().getId(), subTransactionId);
        }
    }

    public void addTabletCommitInfos(long subTxnId, Table table, List<TTabletCommitInfo> commitInfos,
            SubTransactionType subTransactionType) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={}, commit_infos={}",
                    label, transactionId, subTxnId, table, commitInfos);
        }
        subTransactionStates.add(new SubTransactionState(subTxnId, table, commitInfos, subTransactionType));
    }

    public boolean isTransactionBegan() {
        return this.isTransactionBegan;
    }

    public long getTimeout() {
        return (timeoutTimestamp - System.currentTimeMillis()) / 1000;
    }

    private TransactionStatus waitingTxnVisible(long dbId, long transactionId) throws Exception {
        // wait txn visible
        TWaitingTxnStatusRequest request = new TWaitingTxnStatusRequest();
        request.setDbId(dbId).setTxnId(transactionId);
        request.setLabelIsSet(false);
        request.setTxnIdIsSet(true);

        TWaitingTxnStatusResult statusResult = getWaitingTxnStatus(request);
        TransactionStatus txnStatus = TransactionStatus.valueOf(statusResult.getTxnStatusId());
        if (txnStatus == TransactionStatus.COMMITTED) {
            throw new AnalysisException("transaction commit successfully, BUT data will be visible later.");
        } else if (txnStatus != TransactionStatus.VISIBLE) {
            String errMsg = "commit failed, rollback.";
            if (statusResult.getStatus().isSetErrorMsgs()
                    && statusResult.getStatus().getErrorMsgs().size() > 0) {
                errMsg = String.join(". ", statusResult.getStatus().getErrorMsgs());
            }
            throw new AnalysisException(errMsg);
        }
        return txnStatus;
    }

    /**
     * handle 2 forward cases:
     * 1. the first dml stmt in a txn, now the txn is not began: only know the [label]
     *    beginTxn -> txnId
     *    calculate timeoutTimestamp
     *    set dbId
     * 2. the others dml stmts in a txn, now the txn is already began: know the [label, txnId, dbId, timeoutTimestamp]
     */
    public void setTxnInfoInMaster(TTxnLoadInfo txnLoadInfo) throws DdlException {
        this.setTxnConf(new TTxnParams().setNeedTxn(true).setTxnId(-1));
        this.label = txnLoadInfo.getLabel();
        if (txnLoadInfo.isSetTxnId()) {
            Preconditions.checkState(subTransactionStates.isEmpty(),
                    "subTxnStates is not empty: " + subTransactionStates);
            resetByTxnInfo(txnLoadInfo);
            if (this.transactionId > 0) {
                this.transactionState = Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, transactionId);
                Preconditions.checkNotNull(this.transactionState,
                        "db_id=" + dbId + ", txn_id=" + transactionId + " not found");
                Preconditions.checkState(this.label.equals(this.transactionState.getLabel()), "expected label="
                        + this.label + ", real label=" + this.transactionState.getLabel());
                this.isTransactionBegan = true;
            }
        }
        LOG.info("set txn info in master, label={}, txnId={}, dbId={}, timeoutTimestamp={}, allSubTxnNum={}, "
                + "subTxnStates={}", label, transactionId, dbId, timeoutTimestamp, allSubTxnNum, subTransactionStates);
    }

    public TTxnLoadInfo getTxnInfoInMaster() {
        TTxnLoadInfo txnLoadInfo = getTxnLoadInfo();
        LOG.info("master return txn info: {}", txnLoadInfo);
        return txnLoadInfo;
    }

    public TTxnLoadInfo getTxnLoadInfoInObserver() throws AnalysisException {
        if (isInsertValuesTxnBegan()) {
            throw new AnalysisException(
                    "Transaction insert can not insert into values and insert into select at the same time");
        }
        TTxnLoadInfo txnLoadInfo = getTxnLoadInfo();
        LOG.info("get txn load info in observer: {}", txnLoadInfo);
        return txnLoadInfo;
    }

    public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) throws DdlException {
        Preconditions.checkState(txnLoadInfo.getLabel().equals(this.label),
                "expected label=" + this.label + ", real label=" + txnLoadInfo.getLabel());
        subTransactionStates.clear();
        resetByTxnInfo(txnLoadInfo);
        if (this.transactionId > 0) {
            this.isTransactionBegan = true;
        }
        LOG.info("set txn load info in observer, label={}, txnId={}, dbId={}, timeoutTimestamp={}, allSubTxnNum={}, "
                + "subTxnStates={}", label, transactionId, dbId, timeoutTimestamp, allSubTxnNum, subTransactionStates);
    }

    private void resetByTxnInfo(TTxnLoadInfo txnLoadInfo) throws DdlException {
        if (txnLoadInfo.isSetDbId()) {
            this.dbId = txnLoadInfo.getDbId();
            this.database = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
        }
        if (txnLoadInfo.isSetTxnId()) {
            this.transactionId = txnLoadInfo.getTxnId();
        }
        if (txnLoadInfo.isSetTimeoutTimestamp()) {
            this.timeoutTimestamp = txnLoadInfo.getTimeoutTimestamp();
        }
        if (txnLoadInfo.isSetAllSubTxnNum()) {
            this.allSubTxnNum = txnLoadInfo.getAllSubTxnNum();
        }
        if (txnLoadInfo.isSetSubTxnInfos()) {
            for (TSubTxnInfo subTxnInfo : txnLoadInfo.getSubTxnInfos()) {
                TableIf table = database.getTableOrDdlException(subTxnInfo.getTableId());
                subTransactionStates.add(
                        new SubTransactionState(subTxnInfo.getSubTxnId(), (Table) table,
                                subTxnInfo.getTabletCommitInfos(),
                                SubTransactionState.getSubTransactionType(subTxnInfo.getSubTxnType())));
            }
        }
    }

    private TTxnLoadInfo getTxnLoadInfo() {
        TTxnLoadInfo txnLoadInfo = new TTxnLoadInfo();
        txnLoadInfo.setLabel(label);
        if (this.isTransactionBegan) {
            txnLoadInfo.setTxnId(transactionId);
            txnLoadInfo.setDbId(dbId);
            txnLoadInfo.setTimeoutTimestamp(timeoutTimestamp);
            txnLoadInfo.setAllSubTxnNum(allSubTxnNum);
            for (SubTransactionState subTxnState : subTransactionStates) {
                txnLoadInfo.addToSubTxnInfos(new TSubTxnInfo()
                        .setSubTxnId(subTxnState.getSubTransactionId())
                        .setTableId(subTxnState.getTable().getId())
                        .setTabletCommitInfos(subTxnState.getTabletCommitInfos())
                        .setSubTxnType(SubTransactionState.getSubTransactionType(subTxnState.getSubTransactionType())));
            }
        }
        return txnLoadInfo;
    }

    private Set<Long> getTableIds() {
        return subTransactionStates.stream().map(s -> s.getTable().getId()).collect(Collectors.toSet());
    }
}