TxnUtil.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.transaction;

import org.apache.doris.cloud.proto.Cloud.RLTaskTxnCommitAttachmentPB;
import org.apache.doris.cloud.proto.Cloud.RoutineLoadProgressPB;
import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB;
import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB;
import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB.EtlStatusPB;
import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB.FailMsgPB;
import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB.JobStatePB;
import org.apache.doris.cloud.proto.Cloud.TxnCoordinatorPB;
import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
import org.apache.doris.cloud.proto.Cloud.TxnSourceTypePB;
import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.KafkaProgress;
import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TxnCommitAttachment;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;

public class TxnUtil {
    private static final Logger LOG = LogManager.getLogger(TxnUtil.class);

    public static EtlStatusPB.EtlStatePB etlStateToPb(TEtlState tEtlState) {
        switch (tEtlState) {
            case RUNNING:
                return EtlStatusPB.EtlStatePB.RUNNING;
            case FINISHED:
                return EtlStatusPB.EtlStatePB.FINISHED;
            case CANCELLED:
                return EtlStatusPB.EtlStatePB.CANCELLED;
            default:
                return EtlStatusPB.EtlStatePB.UNKNOWN;
        }
    }

    public static TEtlState etlStateFromPb(EtlStatusPB.EtlStatePB etlStatePB) {
        switch (etlStatePB) {
            case RUNNING:
                return TEtlState.RUNNING;
            case FINISHED:
                return TEtlState.FINISHED;
            case CANCELLED:
                return TEtlState.CANCELLED;
            default:
                return TEtlState.UNKNOWN;
        }
    }

    public static JobStatePB jobStateToPb(JobState jobState) {
        switch (jobState) {
            case PENDING:
                return JobStatePB.PENDING;
            case ETL:
                return JobStatePB.ETL;
            case LOADING:
                return JobStatePB.LOADING;
            case COMMITTED:
                return JobStatePB.COMMITTED;
            case FINISHED:
                return JobStatePB.FINISHED;
            case CANCELLED:
                return JobStatePB.CANCELLED;
            default:
                return JobStatePB.UNKNOWN;
        }
    }

    public static JobState jobStateFromPb(JobStatePB jobStatePb) {
        switch (jobStatePb) {
            case PENDING:
                return JobState.PENDING;
            case ETL:
                return JobState.ETL;
            case LOADING:
                return JobState.LOADING;
            case COMMITTED:
                return JobState.COMMITTED;
            case FINISHED:
                return JobState.FINISHED;
            case CANCELLED:
                return JobState.CANCELLED;
            default:
                return JobState.UNKNOWN;
        }
    }

    public static FailMsgPB failMsgToPb(FailMsg failMsg) {
        FailMsgPB.Builder builder = FailMsgPB.newBuilder();
        builder.setMsg(failMsg.getMsg());

        switch (failMsg.getCancelType()) {
            case USER_CANCEL:
                builder.setCancelType(FailMsgPB.CancelTypePB.USER_CANCEL);
                break;
            case ETL_SUBMIT_FAIL:
                builder.setCancelType(FailMsgPB.CancelTypePB.ETL_SUBMIT_FAIL);
                break;
            case ETL_RUN_FAIL:
                builder.setCancelType(FailMsgPB.CancelTypePB.ETL_RUN_FAIL);
                break;
            case ETL_QUALITY_UNSATISFIED:
                builder.setCancelType(FailMsgPB.CancelTypePB.ETL_QUALITY_UNSATISFIED);
                break;
            case LOAD_RUN_FAIL:
                builder.setCancelType(FailMsgPB.CancelTypePB.LOAD_RUN_FAIL);
                break;
            case TIMEOUT:
                builder.setCancelType(FailMsgPB.CancelTypePB.TIMEOUT);
                break;
            case TXN_UNKNOWN:
                builder.setCancelType(FailMsgPB.CancelTypePB.TXN_UNKNOWN);
                break;
            default:
                builder.setCancelType(FailMsgPB.CancelTypePB.UNKNOWN);
                break;
        }
        return builder.build();
    }

    public static FailMsg failMsgFromPb(FailMsgPB failMsgPb) {
        FailMsg failMsg = new FailMsg();
        failMsg.setMsg(failMsgPb.getMsg());
        switch (failMsgPb.getCancelType()) {
            case USER_CANCEL:
                failMsg.setCancelType(FailMsg.CancelType.USER_CANCEL);
                break;
            case ETL_SUBMIT_FAIL:
                failMsg.setCancelType(FailMsg.CancelType.ETL_SUBMIT_FAIL);
                break;
            case ETL_RUN_FAIL:
                failMsg.setCancelType(FailMsg.CancelType.ETL_RUN_FAIL);
                break;
            case ETL_QUALITY_UNSATISFIED:
                failMsg.setCancelType(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED);
                break;
            case LOAD_RUN_FAIL:
                failMsg.setCancelType(FailMsg.CancelType.LOAD_RUN_FAIL);
                break;
            case TIMEOUT:
                failMsg.setCancelType(FailMsg.CancelType.TIMEOUT);
                break;
            case TXN_UNKNOWN:
                failMsg.setCancelType(FailMsg.CancelType.TXN_UNKNOWN);
                break;
            default:
                failMsg.setCancelType(FailMsg.CancelType.UNKNOWN);
                break;
        }
        return failMsg;
    }

    public static EtlStatusPB etlStatusToPb(EtlStatus etlStatus) {
        EtlStatusPB.Builder builder = EtlStatusPB.newBuilder();
        builder.setState(TxnUtil.etlStateToPb(etlStatus.getState()))
                .setTrackingUrl(etlStatus.getTrackingUrl())
                .putAllStats(etlStatus.getStats())
                .putAllCounters(etlStatus.getCounters());
        return builder.build();
    }

    public static EtlStatus etlStatusFromPb(EtlStatusPB etlStatusPB) {
        EtlStatus etlStatus = new EtlStatus();

        etlStatus.setState(TxnUtil.etlStateFromPb(etlStatusPB.getState()));
        etlStatus.setTrackingUrl(etlStatusPB.getTrackingUrl());
        etlStatus.setStats(etlStatusPB.getStats());
        etlStatus.setCounters(etlStatusPB.getCounters());
        return etlStatus;
    }

    public static TxnCommitAttachmentPB loadJobFinalOperationToPb(LoadJobFinalOperation loadJobFinalOperation) {
        LOG.info("loadJobFinalOperation:{}", loadJobFinalOperation);
        TxnCommitAttachmentPB.Builder attachementBuilder = TxnCommitAttachmentPB.newBuilder();
        attachementBuilder.setType(TxnCommitAttachmentPB.Type.LODD_JOB_FINAL_OPERATION);

        TxnCommitAttachmentPB.LoadJobFinalOperationPB.Builder builder =
                TxnCommitAttachmentPB.LoadJobFinalOperationPB.newBuilder();

        builder.setId(loadJobFinalOperation.getId())
                .setLoadingStatus(TxnUtil.etlStatusToPb(loadJobFinalOperation.getLoadingStatus()))
                .setProgress(loadJobFinalOperation.getProgress())
                .setLoadStartTimestamp(loadJobFinalOperation.getLoadStartTimestamp())
                .setFinishTimestamp(loadJobFinalOperation.getFinishTimestamp())
                .setJobState(TxnUtil.jobStateToPb(loadJobFinalOperation.getJobState()));

        if (loadJobFinalOperation.getFailMsg() != null) {
            builder.setFailMsg(TxnUtil.failMsgToPb(loadJobFinalOperation.getFailMsg()));
        }
        // copy into
        builder.setCopyId(loadJobFinalOperation.getCopyId()).setLoadFilePaths(loadJobFinalOperation.getLoadFilePaths());

        attachementBuilder.setLoadJobFinalOperation(builder.build());
        return attachementBuilder.build();
    }

    public static TxnCommitAttachmentPB rlTaskTxnCommitAttachmentToPb(RLTaskTxnCommitAttachment
            rtTaskTxnCommitAttachment) {
        LOG.info("rtTaskTxnCommitAttachment:{}", rtTaskTxnCommitAttachment);
        TxnCommitAttachmentPB.Builder attachementBuilder = TxnCommitAttachmentPB.newBuilder();
        attachementBuilder.setType(TxnCommitAttachmentPB.Type.RT_TASK_TXN_COMMIT_ATTACHMENT);

        RLTaskTxnCommitAttachmentPB.Builder builder =
                RLTaskTxnCommitAttachmentPB.newBuilder();

        UniqueIdPB.Builder taskIdBuilder = UniqueIdPB.newBuilder();
        taskIdBuilder.setHi(rtTaskTxnCommitAttachment.getTaskId().getHi());
        taskIdBuilder.setLo(rtTaskTxnCommitAttachment.getTaskId().getLo());

        RoutineLoadProgressPB.Builder progressBuilder = RoutineLoadProgressPB.newBuilder();
        progressBuilder.putAllPartitionToOffset(((KafkaProgress) rtTaskTxnCommitAttachment.getProgress())
                .getOffsetByPartition());

        builder.setJobId(rtTaskTxnCommitAttachment.getJobId())
                .setTaskId(taskIdBuilder)
                .setFilteredRows(rtTaskTxnCommitAttachment.getFilteredRows())
                .setLoadedRows(rtTaskTxnCommitAttachment.getLoadedRows())
                .setProgress(progressBuilder)
                .setUnselectedRows(rtTaskTxnCommitAttachment.getUnselectedRows())
                .setReceivedBytes(rtTaskTxnCommitAttachment.getReceivedBytes())
                .setTaskExecutionTimeMs(rtTaskTxnCommitAttachment.getTaskExecutionTimeMs());

        if (rtTaskTxnCommitAttachment.getErrorLogUrl() != null) {
            builder.setErrorLogUrl(rtTaskTxnCommitAttachment.getErrorLogUrl());
        }

        attachementBuilder.setRlTaskTxnCommitAttachment(builder.build());
        return attachementBuilder.build();
    }

    public static RLTaskTxnCommitAttachment rtTaskTxnCommitAttachmentFromPb(
            TxnCommitAttachmentPB txnCommitAttachmentPB) {
        RLTaskTxnCommitAttachmentPB rlTaskTxnCommitAttachmentPB = txnCommitAttachmentPB.getRlTaskTxnCommitAttachment();
        if (LOG.isDebugEnabled()) {
            LOG.debug("RLTaskTxnCommitAttachmentPB={}", rlTaskTxnCommitAttachmentPB);
        }
        return new RLTaskTxnCommitAttachment(txnCommitAttachmentPB.getRlTaskTxnCommitAttachment());
    }

    public static LoadJobFinalOperation loadJobFinalOperationFromPb(TxnCommitAttachmentPB txnCommitAttachmentPB) {
        LoadJobFinalOperationPB loadJobFinalOperationPB = txnCommitAttachmentPB.getLoadJobFinalOperation();
        if (LOG.isDebugEnabled()) {
            LOG.debug("loadJobFinalOperationPB={}", loadJobFinalOperationPB);
        }
        FailMsg failMsg = loadJobFinalOperationPB.hasFailMsg()
                ? TxnUtil.failMsgFromPb(loadJobFinalOperationPB.getFailMsg()) : null;
        return new LoadJobFinalOperation(loadJobFinalOperationPB.getId(),
                TxnUtil.etlStatusFromPb(loadJobFinalOperationPB.getLoadingStatus()),
                loadJobFinalOperationPB.getProgress(), loadJobFinalOperationPB.getLoadStartTimestamp(),
                loadJobFinalOperationPB.getFinishTimestamp(),
                TxnUtil.jobStateFromPb(loadJobFinalOperationPB.getJobState()), failMsg,
                loadJobFinalOperationPB.getCopyId(), loadJobFinalOperationPB.getLoadFilePaths(), new HashMap<>());
    }

    public static TxnCoordinatorPB txnCoordinatorToPb(TxnCoordinator txnCoordinator) {
        TxnCoordinatorPB.Builder builder = TxnCoordinatorPB.newBuilder();
        builder.setSourceType(TxnSourceTypePB.forNumber(txnCoordinator.sourceType.value()));
        builder.setId(txnCoordinator.id);
        builder.setIp(txnCoordinator.ip);
        builder.setStartTime(txnCoordinator.startTime);
        return builder.build();
    }

    public static TxnCoordinator txnCoordinatorFromPb(TxnCoordinatorPB txnCoordinatorPB) {
        return new TxnCoordinator(TxnSourceType.valueOf(txnCoordinatorPB.getSourceType().getNumber()),
                txnCoordinatorPB.getId(), txnCoordinatorPB.getIp(), txnCoordinatorPB.getStartTime());
    }

    public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("txnInfo={}", txnInfo);
        }
        long dbId = txnInfo.getDbId();
        List<Long> tableIdList = txnInfo.getTableIdsList();
        long transactionId = txnInfo.getTxnId();
        String label = txnInfo.getLabel();

        TUniqueId requestId = null;
        if (txnInfo.hasRequestId()) {
            requestId = new TUniqueId(txnInfo.getRequestId().getHi(), txnInfo.getRequestId().getLo());
        }

        LoadJobSourceType loadJobSourceType = null;
        if (txnInfo.hasLoadJobSourceType()) {
            loadJobSourceType = LoadJobSourceType.valueOf(txnInfo.getLoadJobSourceType().getNumber());
        }
        TxnCoordinator txnCoordinator = null;
        if (txnInfo.hasCoordinator()) {
            txnCoordinator = TxnUtil.txnCoordinatorFromPb(txnInfo.getCoordinator());
        }

        TransactionStatus transactionStatus = TransactionStatus.valueOf(txnInfo.getStatus().getNumber());
        String reason = txnInfo.getReason();
        long callbackId = txnInfo.getListenerId();
        long timeoutMs = txnInfo.getTimeoutMs();

        TxnCommitAttachment commitAttachment = null;
        if (txnInfo.hasCommitAttachment()) {
            if (txnInfo.getCommitAttachment().getType() == TxnCommitAttachmentPB.Type.LODD_JOB_FINAL_OPERATION) {
                commitAttachment =
                    TxnUtil.loadJobFinalOperationFromPb(txnInfo.getCommitAttachment());
            }

            if (txnInfo.getCommitAttachment().getType() == TxnCommitAttachmentPB.Type.RT_TASK_TXN_COMMIT_ATTACHMENT) {
                commitAttachment =
                    TxnUtil.rtTaskTxnCommitAttachmentFromPb(txnInfo.getCommitAttachment());
            }
        }

        long prepareTime = txnInfo.hasPrepareTime() ? txnInfo.getPrepareTime() : -1;
        long preCommitTime = txnInfo.hasPrecommitTime() ? txnInfo.getPrecommitTime() : -1;
        long commitTime = txnInfo.hasCommitTime() ? txnInfo.getCommitTime() : -1;
        long finishTime = txnInfo.hasFinishTime() ? txnInfo.getFinishTime() : -1;

        TransactionState transactionState = new TransactionState(
                dbId,
                tableIdList,
                transactionId,
                label,
                requestId,
                loadJobSourceType,
                txnCoordinator,
                transactionStatus,
                reason,
                callbackId,
                timeoutMs,
                commitAttachment,
                prepareTime,
                preCommitTime,
                commitTime,
                finishTime
        );
        if (LOG.isDebugEnabled()) {
            LOG.debug("transactionState={}", transactionState);
        }
        return transactionState;
    }

}