RemoteOlapInsertExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.doris.FeServiceClient;
import org.apache.doris.datasource.doris.RemoteDorisExternalCatalog;
import org.apache.doris.datasource.doris.RemoteOlapTable;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.RemoteOlapTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TAbortRemoteTxnRequest;
import org.apache.doris.thrift.TAbortRemoteTxnResult;
import org.apache.doris.thrift.TBeginRemoteTxnRequest;
import org.apache.doris.thrift.TBeginRemoteTxnResult;
import org.apache.doris.thrift.TCommitRemoteTxnRequest;
import org.apache.doris.thrift.TCommitRemoteTxnResult;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
 * Remote executor for Doris Catalog remote insert.
 * Local insert implementation remains in {@link OlapInsertExecutor}.
 *
 * This executor is responsible for wiring Doris Catalog remote transaction
 * lifecycle (begin / commit / abort) and passing remote dbId / txnId
 * into the sink, while reusing the generic insert execution pipeline
 * defined in {@link AbstractInsertExecutor}.
 */
public class RemoteOlapInsertExecutor extends OlapInsertExecutor {

    private static final Logger LOG = LogManager.getLogger(RemoteOlapInsertExecutor.class);

    public RemoteOlapInsertExecutor(ConnectContext ctx, RemoteOlapTable table,
            String labelName, org.apache.doris.nereids.NereidsPlanner planner,
            java.util.Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
            long jobId) {
        super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
    }

    @Override
    public void beginTransaction() {
        RemoteDorisExternalCatalog remoteCatalog = ((RemoteOlapTable) table).getCatalog();
        FeServiceClient client = remoteCatalog.getFeServiceClient();
        String remoteDbName = database.getFullName();
        String remoteTableName = table.getName();

        TBeginRemoteTxnRequest request = new TBeginRemoteTxnRequest();
        request.setDb(remoteDbName);
        request.setTbl(remoteTableName);
        request.setLabel(labelName);
        long timeoutSeconds = getTimeout();
        if (timeoutSeconds > 0) {
            request.setTimeoutMs(timeoutSeconds * 1000L);
        }

        try {
            TBeginRemoteTxnResult result = client.beginRemoteTxn(request);
            if (result.getStatus().getStatusCode() != TStatusCode.OK) {
                switch (result.getStatus().getStatusCode()) {
                    case NOT_AUTHORIZED:
                        throw new AuthenticationException(result.getStatus().getErrorMsgs().get(0));
                    case LABEL_ALREADY_EXISTS:
                        throw new LabelAlreadyUsedException(result.getStatus().getErrorMsgs().get(0));
                    case TOO_MANY_TASKS:
                        throw new BeginTransactionException(result.getStatus().getErrorMsgs().get(0));
                    case LIMIT_REACH:
                        throw new QuotaExceedException(result.getStatus().getErrorMsgs().get(0));
                    case NOT_FOUND:
                        throw new MetaNotFoundException(result.getStatus().getErrorMsgs().get(0));
                    case ANALYSIS_ERROR:
                    case INTERNAL_ERROR:
                    default:
                        throw new AnalysisException(result.getStatus().getErrorMsgs().get(0));
                }
            }
            this.txnId = result.getTxnId();
            LOG.info("begin remote txn success, catalog={}, db={}, table={}, label={}, txnId={}",
                    table.getName(), remoteDbName, remoteTableName, labelName, txnId);
        } catch (Exception e) {
            throw new AnalysisException("begin remote transaction failed. " + e.getMessage(), e);
        }
    }

    @Override
    public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
        RemoteOlapTableSink remoteOlapTableSink = (RemoteOlapTableSink) sink;
        PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) physicalSink;
        OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) insertCtx.orElse(
                new OlapInsertCommandContext(true));

        boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
                && physicalOlapTableSink.isPartialUpdate()
                && physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT;
        try {
            long timeout = getTimeout();
            long dbId = database.getId();
            remoteOlapTableSink.init(ctx.queryId(), txnId, dbId,
                    timeout,
                    ctx.getSessionVariable().getSendBatchParallelism(),
                    false,
                    isStrictMode,
                    timeout, olapInsertCtx);

            if (fragment.getPlanRoot() instanceof ExchangeNode
                    && fragment.getDataPartition().getType() == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED) {
                DataSink childFragmentSink = fragment.getChild(0).getSink();
                DataStreamSink dataStreamSink = null;
                if (childFragmentSink instanceof MultiCastDataSink) {
                    MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink;
                    int outputExchangeId = (fragment.getPlanRoot()).getId().asInt();
                    for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) {
                        int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt();
                        if (outputExchangeId == sinkExchangeId) {
                            dataStreamSink = currentDataStreamSink;
                            break;
                        }
                    }
                    if (dataStreamSink == null) {
                        throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink");
                    }
                } else if (childFragmentSink instanceof DataStreamSink) {
                    dataStreamSink = (DataStreamSink) childFragmentSink;
                } else {
                    throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink);
                }

                dataStreamSink.setTabletSinkSchemaParam(remoteOlapTableSink.getOlapTableSchemaParam());
                dataStreamSink.setTabletSinkPartitionParam(remoteOlapTableSink.getOlapTablePartitionParam());
                dataStreamSink.setTabletSinkTupleDesc(remoteOlapTableSink.getTupleDescriptor());
                List<TOlapTableLocationParam> locationParams = remoteOlapTableSink.getOlapTableLocationParams();
                dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
                dataStreamSink.setTabletSinkTxnId(remoteOlapTableSink.getTxnId());
                dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
            }
        } catch (Exception e) {
            throw new AnalysisException(e.getMessage(), e);
        }
    }

    @Override
    protected void onComplete() throws UserException {
        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
            try {
                String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
                Env.getCurrentGlobalTransactionMgr().abortTransaction(
                        database.getId(), txnId,
                        (errMsg == null ? "unknown reason" : errMsg));
            } catch (Exception abortTxnException) {
                LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException);
            }
        }

        RemoteDorisExternalCatalog remoteCatalog = ((RemoteOlapTable) table).getCatalog();
        FeServiceClient client = remoteCatalog.getFeServiceClient();

        TCommitRemoteTxnRequest request = new TCommitRemoteTxnRequest();
        request.setTxnId(txnId);
        request.setDb(database.getFullName());
        request.setTbl(table.getName());
        request.setCommitInfos(coordinator.getCommitInfos());
        request.setInsertVisibleTimeoutMs(ctx.getSessionVariable().getInsertVisibleTimeoutMs());
        try {
            TCommitRemoteTxnResult result = client.commitRemoteTxn(request);
            if (result.getStatus().getStatusCode() == TStatusCode.OK) {
                if (result.isTxnStatus()) {
                    txnStatus = TransactionStatus.VISIBLE;
                } else {
                    txnStatus = TransactionStatus.COMMITTED;
                }
                LOG.info("commit remote txn success, catalog={}, dbId={}, txnId={}, status={}",
                                    remoteCatalog.getName(), database.getId(), txnId, txnStatus);
            } else {
                switch (result.getStatus().getStatusCode()) {
                    case NOT_AUTHORIZED:
                        throw new AuthenticationException(result.getStatus().getErrorMsgs().get(0));
                    default:
                        throw new UserException(result.getStatus().getErrorMsgs().get(0));
                }
            }
        } catch (UserException e) {
            throw e;
        } catch (Exception e) {
            throw new UserException("commit remote transaction failed unexpectedly. " + e.getMessage(), e);
        }
    }

    /**
     * Abort remote transaction when insert into remote Doris table failed.
     * This method is best-effort and will not throw exception to user.
     */
    @Override
    protected void abortTransactionOnFail() {
        RemoteDorisExternalCatalog remoteCatalog = ((RemoteOlapTable) table).getCatalog();
        FeServiceClient client = remoteCatalog.getFeServiceClient();
        TAbortRemoteTxnRequest request = new TAbortRemoteTxnRequest();
        request.setTxnId(txnId);
        request.setDb(database.getFullName());
        try {
            TAbortRemoteTxnResult result = client.abortRemoteTxn(request);
            if (result.getStatus().getStatusCode() == TStatusCode.OK) {
                LOG.info("abort remote txn success, catalog={}, txnId={} ",
                        remoteCatalog.getName(), txnId);
            } else {
                LOG.warn("abort remote transaction failed. catalog={}, txnId={}, err={}",
                        remoteCatalog.getName(), txnId, result.getStatus().getErrorMsgs().get(0));
            }
        } catch (Exception e) {
            LOG.warn("abort remote transaction failed unexpectedly. catalog={}, txnId={}, err={}",
                    remoteCatalog.getName(), txnId, e.getMessage(), e);
        }
    }

    private String buildFinalErrorMessage(Throwable t) {
        String localErrMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
        String firstErrorMsgPart = "";
        String urlPart = "";
        if (!Strings.isNullOrEmpty(coordinator.getFirstErrorMsg())) {
            firstErrorMsgPart = StringUtils.abbreviate(coordinator.getFirstErrorMsg(),
                    org.apache.doris.common.Config.first_error_msg_max_length);
        }
        if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
            urlPart = coordinator.getTrackingUrl();
        }
        return InsertUtils.getFinalErrorMsg(localErrMsg, firstErrorMsgPart, urlPart);
    }

    @Override
    protected void onFail(Throwable t) {
        errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
        String queryId = DebugUtil.printId(ctx.queryId());
        LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
        if (txnId != INVALID_TXN_ID) {
            try {
                abortTransactionOnFail();
            } catch (Exception abortTxnException) {
                LOG.warn("insert [{}] with query id {} abort txn {} failed",
                        labelName, queryId, txnId, abortTxnException);
            }
        }
        String finalErrorMsg = buildFinalErrorMessage(t);
        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, finalErrorMsg);
    }

    @Override
    protected void afterExec(StmtExecutor executor) {
        // Go here, which means:
        // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
        // 2. transaction failed but Config.using_old_load_usage_pattern is true.
        // we will record the load job info for these 2 cases
        try {
            // Do not register job if job id is -1.
            if (!Config.enable_nereids_load && jobId != -1) {
                ((RemoteOlapTable) table).getCatalog().getFeServiceClient().recordFinishedLoadJob(
                        labelName, txnId, database.getFullName(), table.getName(), createTime, errMsg,
                        coordinator.getTrackingUrl(), coordinator.getFirstErrorMsg(), jobId);
            }
        } catch (MetaNotFoundException e) {
            LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
            errMsg = "Record info of insert load with error " + e.getMessage();
        }
        setReturnInfo();
    }
}