OlapGroupCommitInsertExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.insert;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.InlineTable;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

/**
 * Insert executor for olap table with group commit
 */
public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor {
    private static final Logger LOG = LogManager.getLogger(OlapGroupCommitInsertExecutor.class);

    private Backend groupCommitBackend;

    public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table,
            String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx,
            boolean emptyInsert, Backend backend) {
        super(ctx, table, labelName, planner, insertCtx, emptyInsert);
        this.groupCommitBackend = backend;
    }

    /**
     * check if the sql can run in group commit mode
     */
    public static void fastAnalyzeGroupCommit(ConnectContext ctx, LogicalPlan logicalPlan) {
        try {
            if (ctx.getSessionVariable().isEnableInsertGroupCommit() && !ctx.isTxnModel() && !ctx.getSessionVariable()
                    .isEnableUniqueKeyPartialUpdate()) {
                ctx.setGroupCommit(true);
            }
        } catch (Throwable e) {
            LOG.warn("analyze group commit failed", e);
        }
    }

    /**
     * check if the sql can run in group commit mode
     */
    public static void analyzeGroupCommit(ConnectContext ctx, LogicalPlan logicalPlan) {
        try {
            if (ctx.isGroupCommit()) {
                return;
            }
            if (!ctx.getSessionVariable().isEnableInsertGroupCommit()) {
                return;
            }
            if (logicalPlan instanceof PrepareCommand) {
                logicalPlan = ((PrepareCommand) logicalPlan).getLogicalPlan();
            }
            if (logicalPlan instanceof InsertIntoTableCommand) {
                LogicalPlan logicalQuery = ((InsertIntoTableCommand) logicalPlan).getLogicalQuery();
                TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
                OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf, logicalQuery,
                        Optional.empty());
            }
        } catch (Throwable e) {
            LOG.warn("analyze group commit failed", e);
        }
    }

    protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, LogicalPlan logicalQuery,
            Optional<InsertCommandContext> insertCtx) {
        // The flag is set to false before execute sql, if it is true, this is a http stream
        if (ctx.isGroupCommit()) {
            return;
        }
        if (logicalQuery instanceof UnboundTableSink) {
            UnboundTableSink<?> tableSink = (UnboundTableSink<?>) logicalQuery;
            List<Pair<BooleanSupplier, Supplier<String>>> conditions = new ArrayList<>();
            conditions.add(Pair.of(() -> ctx.getSessionVariable().isEnableInsertGroupCommit(),
                    () -> "group_commit session variable: " + ctx.getSessionVariable().groupCommit));
            conditions.add(Pair.of(() -> !ctx.isTxnModel(), () -> "isTxnModel"));
            conditions.add(Pair.of(() -> !ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate(),
                    () -> "enableUniqueKeyPartialUpdate"));
            conditions.add(Pair.of(() -> table instanceof OlapTable,
                    () -> "not olapTable, class: " + table.getClass().getName()));
            conditions.add(Pair.of(() -> ((OlapTable) table).getTableProperty().getUseSchemaLightChange(),
                    () -> "notUseSchemaLightChange"));
            conditions.add(Pair.of(
                    () -> !((OlapTable) table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME),
                    () -> "db is internal"));
            conditions.add(Pair.of(() -> tableSink.getPartitions().isEmpty(),
                    () -> "partitions is empty: " + tableSink.getPartitions()));
            conditions.add(Pair.of(() -> (!(table instanceof MTMV) || MTMVUtil.allowModifyMTMVData(ctx)),
                    () -> "not allowModifyMTMVData"));
            conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext
                    && ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
            Plan tableSinkChild = tableSink.child();
            conditions.add(Pair.of(() -> tableSinkChild instanceof OneRowRelation
                            || (tableSinkChild instanceof LogicalUnion
                            && tableSinkChild.getExpressions().size() > 0)
                            || tableSinkChild instanceof InlineTable,
                    () -> "should be one row relation or union or inline table, class: "
                            + tableSinkChild.getClass().getName() + (tableSinkChild instanceof LogicalUnion
                            ? ", expression size is 0" : "")));
            ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean()));
            if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
                for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) {
                    if (pair.first.getAsBoolean() == false) {
                        LOG.debug("group commit is off for query_id: {}, table: {}, because: {}",
                                DebugUtil.printId(ctx.queryId()), table.getName(), pair.second.get());
                        break;
                    }
                }
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("group commit is off for query_id: {}, table: {}, because logicalQuery class: {}",
                        DebugUtil.printId(ctx.queryId()), table.getName(), logicalQuery.getClass().getName());
            }
        }
    }

    @Override
    protected void beforeExec() {
        if (Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
            String msg = "insert table " + this.table.getId() + GroupCommitPlanner.SCHEMA_CHANGE;
            LOG.info(msg);
            throw new AnalysisException(msg);
        }
        // this is used for old coordinator
        this.coordinator.setGroupCommitBe(groupCommitBackend);
    }

    @Override
    public void beginTransaction() {
    }

    @Override
    protected void onComplete() {
        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
            txnStatus = TransactionStatus.ABORTED;
        } else {
            txnStatus = TransactionStatus.PREPARE;
        }
    }

    @Override
    protected void onFail(Throwable t) {
        errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
        String queryId = DebugUtil.printId(ctx.queryId());
        // if any throwable being thrown during insert operation, first we should abort this txn
        LOG.warn("insert [{}] with query id {} failed, url={}", labelName, queryId, coordinator.getTrackingUrl(), t);
        StringBuilder sb = new StringBuilder(t.getMessage());
        if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
            sb.append(". url: ").append(coordinator.getTrackingUrl());
        }
        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
    }

    @Override
    protected void afterExec(StmtExecutor executor) {
        labelName = coordinator.getLabel();
        txnId = coordinator.getTxnId();
        setReturnInfo();
    }
}