OlapInsertExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * Insert executor for olap table
 */
public class OlapInsertExecutor extends AbstractInsertExecutor {
    private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class);
    protected TransactionStatus txnStatus = TransactionStatus.ABORTED;

    protected OlapTable olapTable;

    /**
     * constructor
     */
    public OlapInsertExecutor(ConnectContext ctx, Table table,
            String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
        super(ctx, table, labelName, planner, insertCtx, emptyInsert);
        this.olapTable = (OlapTable) table;
    }

    @Override
    public void beginTransaction() {
        if (isGroupCommitHttpStream()) {
            LOG.info("skip begin transaction for group commit http stream");
            return;
        }
        try {
            if (DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) {
                throw new BeginTransactionException("current running txns on db is larger than limit");
            }
            this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                    database.getId(), ImmutableList.of(table.getId()), labelName,
                    new TxnCoordinator(TxnSourceType.FE, 0,
                            FrontendOptions.getLocalHostAddress(),
                            ExecuteEnv.getInstance().getStartupTime()),
                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeoutS());
        } catch (Exception e) {
            throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
        }
    }

    @Override
    public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
        OlapTableSink olapTableSink = (OlapTableSink) sink;
        PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) physicalSink;
        OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) insertCtx.orElse(
                new OlapInsertCommandContext(true));

        boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
                && physicalOlapTableSink.isPartialUpdate()
                && physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT;
        try {
            // TODO refactor this to avoid call legacy planner's function
            long timeout = getTimeout();
            olapTableSink.init(ctx.queryId(), txnId, database.getId(),
                    timeout,
                    ctx.getSessionVariable().getSendBatchParallelism(),
                    false,
                    isStrictMode,
                    timeout, olapInsertCtx);

            // set schema and partition info for tablet id shuffle exchange
            if (fragment.getPlanRoot() instanceof ExchangeNode
                    && fragment.getDataPartition().getType() == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED) {
                DataSink childFragmentSink = fragment.getChild(0).getSink();
                DataStreamSink dataStreamSink = null;
                if (childFragmentSink instanceof MultiCastDataSink) {
                    MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink;
                    int outputExchangeId = (fragment.getPlanRoot()).getId().asInt();
                    // which DataStreamSink link to the output exchangeNode?
                    for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) {
                        int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt();
                        if (outputExchangeId == sinkExchangeId) {
                            dataStreamSink = currentDataStreamSink;
                            break;
                        }
                    }
                    if (dataStreamSink == null) {
                        throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink");
                    }
                } else if (childFragmentSink instanceof DataStreamSink) {
                    dataStreamSink = (DataStreamSink) childFragmentSink;
                } else {
                    throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink);
                }

                dataStreamSink.setTabletSinkSchemaParam(olapTableSink.getOlapTableSchemaParam());
                dataStreamSink.setTabletSinkPartitionParam(olapTableSink.getOlapTablePartitionParam());
                dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
                List<TOlapTableLocationParam> locationParams = olapTableSink.getOlapTableLocationParams();
                dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
                dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
                dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
            }
        } catch (Exception e) {
            throw new AnalysisException(e.getMessage(), e);
        }
        if (!isGroupCommitHttpStream()) {
            TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
            if (state == null) {
                throw new AnalysisException("txn does not exist: " + txnId);
            }
            addTableIndexes(state);
            if (physicalOlapTableSink.isPartialUpdate()) {
                state.setSchemaForPartialUpdate((OlapTable) table);
            }
        }
    }

    protected void addTableIndexes(TransactionState state) {
        state.addTableIndexes((OlapTable) table);
    }

    @Override
    protected void beforeExec() {
        String queryId = DebugUtil.printId(ctx.queryId());
        LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
    }

    @Override
    protected void onComplete() throws UserException {
        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
            try {
                String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
                Env.getCurrentGlobalTransactionMgr().abortTransaction(
                        database.getId(), txnId,
                        (errMsg == null ? "unknown reason" : errMsg));
            } catch (Exception abortTxnException) {
                LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException);
            }
        } else if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
                database, Lists.newArrayList((Table) table),
                txnId,
                TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
                ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
            txnStatus = TransactionStatus.VISIBLE;
        } else {
            txnStatus = TransactionStatus.COMMITTED;
        }
        if (Config.isCloudMode()) {
            String clusterName = ctx.getCloudCluster();
            if (ctx.getSessionVariable().enableMultiClusterSyncLoad()
                    && clusterName != null && !clusterName.isEmpty()) {
                CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo();
                List<List<Backend>> backendsList = infoService
                                                        .getCloudClusterNames()
                                                        .stream()
                                                        .filter(name -> !name.equals(clusterName))
                                                        .map(name -> infoService.getBackendsByClusterName(name))
                                                        .collect(Collectors.toList());
                List<Long> allTabletIds = ((OlapTable) table).getAllTabletIds();
                StmtExecutor.syncLoadForTablets(backendsList, allTabletIds);
            }
        }
    }

    @Override
    protected void onFail(Throwable t) {
        errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
        String queryId = DebugUtil.printId(ctx.queryId());
        // if any throwable being thrown during insert operation, first we should abort this txn
        LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
        if (txnId != INVALID_TXN_ID) {
            try {
                Env.getCurrentGlobalTransactionMgr().abortTransaction(
                        database.getId(), txnId, errMsg);
            } catch (Exception abortTxnException) {
                // just print a log if abort txn failed. This failure do not need to pass to user.
                // user only concern abort how txn failed.
                LOG.warn("insert [{}] with query id {} abort txn {} failed",
                        labelName, queryId, txnId, abortTxnException);
            }
        }
        // retry insert into from select when meet E-230 in cloud
        if (Config.isCloudMode() && t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
            return;
        }
        StringBuilder sb = new StringBuilder(t.getMessage());
        if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
            sb.append(". url: ").append(coordinator.getTrackingUrl());
        }
        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
    }

    @Override
    protected void afterExec(StmtExecutor executor) {
        // Go here, which means:
        // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
        // 2. transaction failed but Config.using_old_load_usage_pattern is true.
        // we will record the load job info for these 2 cases
        try {
            // the statement parsed by Nereids is saved at executor::parsedStmt.
            StatementBase statement = executor.getParsedStmt();
            UserIdentity userIdentity;
            //if we use job scheduler, parse statement will not set user identity,so we need to get it from context
            if (null == statement) {
                userIdentity = ctx.getCurrentUserIdentity();
            } else {
                userIdentity = statement.getUserInfo();
            }
            EtlJobType etlJobType = EtlJobType.INSERT;
            if (0 != jobId) {
                etlJobType = EtlJobType.INSERT_JOB;
            }
            if (!Config.enable_nereids_load) {
                // just record for loadv2 here
                ctx.getEnv().getLoadManager()
                        .recordFinishedLoadJob(labelName, txnId, database.getFullName(),
                                table.getId(),
                                etlJobType, createTime, errMsg,
                                coordinator.getTrackingUrl(), userIdentity, jobId);
            }
        } catch (MetaNotFoundException e) {
            LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
            errMsg = "Record info of insert load with error " + e.getMessage();
        }

        setReturnInfo();
    }

    protected void setReturnInfo() {
        // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
        // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
        StringBuilder sb = new StringBuilder();
        sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name());
        sb.append("', 'txnId':'").append(txnId).append("'");
        if (table.getType() == TableType.MATERIALIZED_VIEW) {
            sb.append("', 'rows':'").append(loadedRows).append("'");
        }
        if (!Strings.isNullOrEmpty(errMsg)) {
            sb.append(", 'err':'").append(errMsg).append("'");
        }
        sb.append("}");

        ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
        // set insert result in connection context,
        // so that user can use `show insert result` to get info of the last insert operation.
        ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
                txnStatus, loadedRows, filteredRows);
        // update it, so that user can get loaded rows in fe.audit.log
        ctx.updateReturnRows((int) loadedRows);
    }

    public long getTimeout() {
        return ctx.getExecTimeoutS();
    }

    private boolean isGroupCommitHttpStream() {
        return ConnectContext.get() != null && ConnectContext.get().isGroupCommit();
    }
}