StmtExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AnalyzeDBStmt;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
import org.apache.doris.analysis.CreateTableLikeStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InsertOverwriteTableStmt;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.LoadType;
import org.apache.doris.analysis.LockTablesStmt;
import org.apache.doris.analysis.NativeInsertStmt;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.PlaceHolderExpr;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.ReplaceTableClause;
import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SetOperationStmt;
import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.SetVar.SetVarType;
import org.apache.doris.analysis.ShowStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StmtRewriter;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.SwitchStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TransactionBeginStmt;
import org.apache.doris.analysis.TransactionCommitStmt;
import org.apache.doris.analysis.TransactionRollbackStmt;
import org.apache.doris.analysis.TransactionStmt;
import org.apache.doris.analysis.UnifiedLoadStmt;
import org.apache.doris.analysis.UnlockTablesStmt;
import org.apache.doris.analysis.UnsetVariableStmt;
import org.apache.doris.analysis.UnsupportedStmt;
import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.analysis.UseStmt;
import org.apache.doris.analysis.WarmUpClusterStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.cloud.analysis.UseCloudClusterStmt;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.proto.Cloud.ClusterStatus;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuditLog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.cache.NereidsSqlCacheManager;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.loadv2.LoadManagerAdapter;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlOkPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.PlanProcess;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundBaseExternalTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.MustFallbackException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Placeholder;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.InlineTable;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.commands.Redirect;
import org.apache.doris.nereids.trees.plans.commands.TransactionCommand;
import org.apache.doris.nereids.trees.plans.commands.UnsupportedCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.commands.utils.KillUtils;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.InternalService.POutfileWriteSuccessRequest;
import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.BackendService.Client;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TResultFileSink;
import org.apache.doris.thrift.TResultFileSinkOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TSyncLoadForTabletsRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;

import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

// Do one COM_QUERY process.
// first: Parse receive byte array to statement struct.
// second: Do handle function for statement.
public class StmtExecutor {
    private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);

    private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
    public static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
    private static Set<String> blockSqlAstNames = Sets.newHashSet();

    private Pattern beIpPattern = Pattern.compile("\\[(\\d+):");
    private ConnectContext context;
    private final StatementContext statementContext;
    private MysqlSerializer serializer;
    private OriginStatement originStmt;
    private StatementBase parsedStmt;
    private Analyzer analyzer;
    private ProfileType profileType = ProfileType.QUERY;

    @Setter
    private volatile Coordinator coord = null;
    private MasterOpExecutor masterOpExecutor = null;
    private RedirectStatus redirectStatus = null;
    private Planner planner;
    private boolean isProxy;
    private ShowResultSet proxyShowResultSet = null;
    private Data.PQueryStatistics.Builder statisticsForAuditLog;
    private boolean isCached;
    private String stmtName;
    private String prepareStmtName; // for prox
    private String mysqlLoadId;
    // Handle selects that fe can do without be
    private boolean isHandleQueryInFe = false;
    // The profile of this execution
    private final Profile profile;
    private Boolean isForwardedToMaster = null;

    // The result schema if "dry_run_query" is true.
    // Only one column to indicate the real return row numbers.
    private static final CommonResultSetMetaData DRY_RUN_QUERY_METADATA = new CommonResultSetMetaData(
            Lists.newArrayList(new Column("ReturnedRows", PrimitiveType.STRING)));

    // this constructor is mainly for proxy
    public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
        Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL));
        this.context = context;
        this.originStmt = originStmt;
        this.serializer = context.getMysqlChannel().getSerializer();
        this.isProxy = isProxy;
        this.statementContext = new StatementContext(context, originStmt);
        this.context.setStatementContext(statementContext);
        this.profile = new Profile(
                this.context.getSessionVariable().enableProfile(),
                this.context.getSessionVariable().getProfileLevel(),
                this.context.getSessionVariable().getAutoProfileThresholdMs());
    }

    // for test
    public StmtExecutor(ConnectContext context, String stmt) {
        this(context, new OriginStatement(stmt, 0), false);
        this.stmtName = stmt;
    }

    // constructor for receiving parsed stmt from connect processor
    public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt) {
        this.context = ctx;
        this.parsedStmt = parsedStmt;
        this.originStmt = parsedStmt.getOrigStmt();
        if (context.getConnectType() == ConnectType.MYSQL) {
            this.serializer = context.getMysqlChannel().getSerializer();
        } else {
            this.serializer = null;
        }
        this.isProxy = false;
        if (parsedStmt instanceof LogicalPlanAdapter) {
            this.statementContext = ((LogicalPlanAdapter) parsedStmt).getStatementContext();
            this.statementContext.setConnectContext(ctx);
            this.statementContext.setOriginStatement(originStmt);
            this.statementContext.setParsedStatement(parsedStmt);
        } else {
            this.statementContext = new StatementContext(ctx, originStmt);
            this.statementContext.setParsedStatement(parsedStmt);
        }
        this.context.setStatementContext(statementContext);
        this.profile = new Profile(
                            context.getSessionVariable().enableProfile(),
                            context.getSessionVariable().getProfileLevel(),
                            context.getSessionVariable().getAutoProfileThresholdMs());
    }

    public boolean isProxy() {
        return isProxy;
    }

    public static InternalService.PDataRow getRowStringValue(List<Expr> cols,
            FormatOptions options) throws UserException {
        if (cols.isEmpty()) {
            return null;
        }
        InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
        for (Expr expr : cols) {
            if (expr instanceof PlaceHolderExpr) {
                expr = ((PlaceHolderExpr) expr).getLiteral();
            }
            if (!expr.isLiteralOrCastExpr()) {
                throw new UserException(
                        "do not support non-literal expr in transactional insert operation: " + expr.toSql());
            }
            row.addColBuilder().setValue(expr.getStringValueForStreamLoad(options));
        }
        return row.build();
    }

    private Map<String, String> getSummaryInfo(boolean isFinished) {
        long currentTimestamp = System.currentTimeMillis();
        SummaryBuilder builder = new SummaryBuilder();
        builder.profileId(DebugUtil.printId(context.queryId()));
        if (Version.DORIS_BUILD_VERSION_MAJOR == 0) {
            builder.dorisVersion(Version.DORIS_BUILD_SHORT_HASH);
        } else {
            builder.dorisVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
        }
        builder.taskType(profileType.name());
        builder.startTime(TimeUtils.longToTimeString(context.getStartTime()));
        // TODO: Never use custom data format when deliverying information between two systems.
        // UI can not order profile by TOTAL_TIME since its not a sortable string (2h1m3s > 2h1s?)
        // to get decoded info, UI need to decode it first, it means others need to
        // reference the implementation of DebugUtil.getPrettyStringMs to figure out the format
        if (isFinished) {
            builder.endTime(TimeUtils.longToTimeString(currentTimestamp));
            builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - context.getStartTime()));
        }
        String taskState = "RUNNING";
        if (isFinished) {
            if (coord != null) {
                taskState = coord.getExecStatus().getErrorCode().name();
            } else {
                taskState = context.getState().toString();
            }
        }
        builder.taskState(taskState);
        builder.user(context.getQualifiedUser());
        builder.defaultCatalog(context.getCurrentCatalog().getName());
        builder.defaultDb(context.getDatabase());
        builder.workloadGroup(context.getWorkloadGroupName());
        builder.sqlStatement(originStmt == null ? "" : originStmt.originStmt);
        builder.isCached(isCached ? "Yes" : "No");

        Map<String, Integer> beToInstancesNum = coord == null ? Maps.newTreeMap() : coord.getBeToInstancesNum();
        builder.totalInstancesNum(String.valueOf(beToInstancesNum.values().stream().reduce(0, Integer::sum)));
        builder.instancesNumPerBe(
                beToInstancesNum.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue())
                        .collect(Collectors.joining(",")));
        builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
        builder.traceId(context.getSessionVariable().getTraceId());
        builder.isNereids(context.getState().isNereids ? "Yes" : "No");
        return builder.build();
    }

    public Planner planner() {
        return planner;
    }

    public void setPlanner(Planner planner) {
        this.planner = planner;
    }

    public boolean isForwardToMaster() {
        if (isForwardedToMaster == null) {
            isForwardedToMaster = shouldForwardToMaster();
        }
        return isForwardedToMaster;
    }

    private boolean shouldForwardToMaster() {
        if (Env.getCurrentEnv().isMaster()) {
            return false;
        }

        if (Config.enable_bdbje_debug_mode) {
            return false;
        }

        // this is a query stmt, but this non-master FE can not read, forward it to master
        if (isQuery() && !Env.getCurrentEnv().isMaster()
                && (!Env.getCurrentEnv().canRead() || debugForwardAllQueries() || Config.force_forward_all_queries)) {
            return true;
        }

        if (redirectStatus == null) {
            return false;
        } else {
            return redirectStatus.isForwardToMaster();
        }
    }

    private boolean debugForwardAllQueries() {
        DebugPoint debugPoint = DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries");
        return debugPoint != null && debugPoint.param("forwardAllQueries", false);
    }

    public ByteBuffer getOutputPacket() {
        if (masterOpExecutor == null) {
            return null;
        } else {
            return masterOpExecutor.getOutputPacket();
        }
    }

    public ShowResultSet getProxyShowResultSet() {
        return proxyShowResultSet;
    }

    public ShowResultSet getShowResultSet() {
        if (masterOpExecutor == null) {
            return null;
        } else {
            return masterOpExecutor.getProxyResultSet();
        }
    }

    public String getProxyStatus() {
        if (masterOpExecutor == null) {
            return MysqlStateType.UNKNOWN.name();
        }
        return masterOpExecutor.getProxyStatus();
    }

    public int getProxyStatusCode() {
        if (masterOpExecutor == null) {
            return MysqlStateType.UNKNOWN.ordinal();
        }
        return masterOpExecutor.getStatusCode();
    }

    public String getProxyErrMsg() {
        if (masterOpExecutor == null) {
            return MysqlStateType.UNKNOWN.name();
        }
        return masterOpExecutor.getErrMsg();
    }

    public boolean isSyncLoadKindStmt() {
        if (parsedStmt == null) {
            return false;
        }
        if (parsedStmt instanceof LogicalPlanAdapter) {
            LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
            return logicalPlan instanceof InsertIntoTableCommand
                    || logicalPlan instanceof InsertOverwriteTableCommand
                    || (logicalPlan instanceof CreateTableCommand
                    && ((CreateTableCommand) logicalPlan).isCtasCommand())
                    || logicalPlan instanceof DeleteFromCommand;
        }
        return parsedStmt instanceof InsertStmt || parsedStmt instanceof InsertOverwriteTableStmt
                || parsedStmt instanceof CreateTableAsSelectStmt || parsedStmt instanceof DeleteStmt;
    }

    public boolean isAnalyzeStmt() {
        if (parsedStmt == null) {
            return false;
        }
        return parsedStmt instanceof AnalyzeStmt;
    }

    /**
     * Used for audit in ConnectProcessor.
     * <p>
     * TODO: There are three interface in StatementBase be called when doing audit:
     *      toDigest needAuditEncryption when parsedStmt is not a query
     *      and isValuesOrConstantSelect when parsedStmt is instance of InsertStmt.
     *      toDigest: is used to compute Statement fingerprint for blocking some queries
     *      needAuditEncryption: when this interface return true,
     *          log statement use toSql function instead of log original string
     *      isValuesOrConstantSelect: when this interface return true, original string is truncated at 1024
     *
     * @return parsed and analyzed statement for Stale planner.
     *         an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
     */
    public StatementBase getParsedStmt() {
        return parsedStmt;
    }

    public boolean isHandleQueryInFe() {
        return isHandleQueryInFe;
    }

    // query with a random sql
    public void execute() throws Exception {
        UUID uuid = UUID.randomUUID();
        TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
        if (Config.enable_print_request_before_execution) {
            LOG.info("begin to execute query {} {}",
                    DebugUtil.printId(queryId), originStmt == null ? "null" : originStmt.originStmt);
        }
        queryRetry(queryId);
    }

    public void queryRetry(TUniqueId queryId) throws Exception {
        TUniqueId firstQueryId = queryId;
        UUID uuid;
        int retryTime = Config.max_query_retry_time;
        retryTime = retryTime <= 0 ? 1 : retryTime + 1;
        // If the query is an `outfile` statement,
        // we execute it only once to avoid exporting redundant data.
        if (parsedStmt instanceof Queriable) {
            retryTime = ((Queriable) parsedStmt).hasOutFileClause() ? 1 : retryTime;
        }
        for (int i = 1; i <= retryTime; i++) {
            try {
                execute(queryId);
                return;
            } catch (UserException e) {
                if (!e.getMessage().contains(FeConstants.CLOUD_RETRY_E230) || i == retryTime) {
                    throw e;
                }
                if (this.coord != null && this.coord.isQueryCancelled()) {
                    throw e;
                }
                TUniqueId lastQueryId = queryId;
                uuid = UUID.randomUUID();
                queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
                int randomMillis = 10 + (int) (Math.random() * 10);
                if (i > retryTime / 2) {
                    randomMillis = 20 + (int) (Math.random() * 10);
                }
                if (DebugPointUtil.isEnable("StmtExecutor.retry.longtime")) {
                    randomMillis = 1000;
                }
                LOG.warn("receive E-230 tried={} first queryId={} last queryId={} new queryId={} sleep={}ms",
                        i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId),
                        DebugUtil.printId(queryId), randomMillis);
                Thread.sleep(randomMillis);
                context.getState().reset();
            } catch (Exception e) {
                throw e;
            }
        }
    }

    public void execute(TUniqueId queryId) throws Exception {
        SessionVariable sessionVariable = context.getSessionVariable();
        if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
            context.setReturnResultFromLocal(true);
        }

        try {
            try {
                executeByNereids(queryId);
            } catch (NereidsException | ParseException e) {
                if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
                    MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId()));
                }
                // try to fall back to legacy planner
                if (LOG.isDebugEnabled()) {
                    LOG.debug("nereids cannot process statement\n{}\n because of {}",
                            originStmt.originStmt, e.getMessage(), e);
                }
                // only must fall back + unsupported command could use legacy planner
                if ((e instanceof NereidsException
                        && !(((NereidsException) e).getException() instanceof MustFallbackException))
                        || !((parsedStmt instanceof LogicalPlanAdapter
                        && ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof Command))) {
                    LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
                    context.getState().setError(e.getMessage());
                    return;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("fall back to legacy planner on statement:\n{}", originStmt.originStmt);
                }
                parsedStmt = null;
                planner = null;
                isForwardedToMaster = null;
                redirectStatus = null;
                // Attention: currently exception from nereids does not mean an Exception to user terminal
                // unless user does not allow fallback to lagency planner. But state of query
                // has already been set to Error in this case, it will have some side effect on profile result
                // and audit log. So we need to reset state to OK if query cancel be processd by lagency.
                context.getState().reset();
                context.getState().setNereids(false);
                executeByLegacy(queryId);
            }
        } finally {
            // revert Session Value
            try {
                VariableMgr.revertSessionValue(sessionVariable);
                // origin value init
                sessionVariable.setIsSingleSetVar(false);
                sessionVariable.clearSessionOriginValue();
            } catch (DdlException e) {
                LOG.warn("failed to revert Session value. {}", context.getQueryIdentifier(), e);
                context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            }
        }
    }

    public void checkBlockRules() throws AnalysisException {
        checkBlockRulesByRegex(originStmt);
        checkBlockRulesByScan(planner);
    }

    public void checkBlockRulesByRegex(OriginStatement originStmt) throws AnalysisException {
        if (originStmt == null) {
            return;
        }
        Env.getCurrentEnv().getSqlBlockRuleMgr().matchSql(
                originStmt.originStmt, context.getSqlHash(), context.getQualifiedUser());
    }

    public void checkBlockRulesByScan(Planner planner) throws AnalysisException {
        if (planner == null) {
            return;
        }
        List<ScanNode> scanNodeList = planner.getScanNodes();
        for (ScanNode scanNode : scanNodeList) {
            if (scanNode instanceof OlapScanNode || scanNode instanceof FileScanNode) {
                Env.getCurrentEnv().getSqlBlockRuleMgr().checkLimitations(
                        scanNode.getSelectedPartitionNum(),
                        scanNode.getSelectedSplitNum(),
                        scanNode.getCardinality(),
                        context.getQualifiedUser());

            }
        }
    }

    private void executeByNereids(TUniqueId queryId) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Nereids start to execute query:\n {}", originStmt.originStmt);
        }
        context.setQueryId(queryId);
        context.setStartTime();
        profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
        List<List<String>> changedSessionVar = VariableMgr.dumpChangedVars(context.getSessionVariable());
        profile.setChangedSessionVar(DebugUtil.prettyPrintChangedSessionVar(changedSessionVar));
        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());

        parseByNereids();
        Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
                "Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName());
        context.getState().setNereids(true);
        LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
        checkSqlBlocked(logicalPlan.getClass());
        if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
            if (isForwardToMaster()) {
                throw new UserException("Forward master command is not supported for prepare statement");
            }
            if (logicalPlan instanceof UnsupportedCommand || logicalPlan instanceof CreatePolicyCommand) {
                throw new NereidsException(
                        new MustFallbackException("cannot prepare command " + logicalPlan.getClass().getSimpleName()));
            }
            long stmtId = Config.prepared_stmt_start_id > 0
                    ? Config.prepared_stmt_start_id : context.getPreparedStmtId();
            this.prepareStmtName = String.valueOf(stmtId);
            // When proxy executing, this.statementContext is created in constructor.
            // But context.statementContext is created in LogicalPlanBuilder.
            List<Placeholder> placeholders = context == null
                    ? statementContext.getPlaceholders() : context.getStatementContext().getPlaceholders();
            logicalPlan = new PrepareCommand(prepareStmtName, logicalPlan, placeholders, originStmt);
        }
        // when we in transaction mode, we only support insert into command and transaction command
        if (context.isTxnModel()) {
            if (!(logicalPlan instanceof BatchInsertIntoTableCommand || logicalPlan instanceof InsertIntoTableCommand
                    || logicalPlan instanceof UpdateCommand || logicalPlan instanceof DeleteFromUsingCommand
                    || logicalPlan instanceof DeleteFromCommand || logicalPlan instanceof TransactionCommand
                    || logicalPlan instanceof UnsupportedCommand)) {
                String errMsg = "This is in a transaction, only insert, update, delete, "
                        + "commit, rollback is acceptable.";
                throw new NereidsException(errMsg, new AnalysisException(errMsg));
            }
        }
        if (logicalPlan instanceof Command) {
            if (logicalPlan instanceof Redirect) {
                OlapGroupCommitInsertExecutor.analyzeGroupCommit(context, logicalPlan);
                redirectStatus = ((Redirect) logicalPlan).toRedirectStatus();
                if (isForwardToMaster()) {
                    // before forward to master, we also need to set profileType in this node
                    if (logicalPlan instanceof InsertIntoTableCommand) {
                        profileType = ProfileType.LOAD;
                    }
                    if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
                        throw new UserException("Forward master command is not supported for prepare statement");
                    }
                    if (isProxy) {
                        // This is already a stmt forwarded from other FE.
                        // If we goes here, means we can't find a valid Master FE(some error happens).
                        // To avoid endless forward, throw exception here.
                        throw new NereidsException(new UserException("The statement has been forwarded to master FE("
                                + Env.getCurrentEnv().getSelfNode().getHost() + ") and failed to execute"
                                + " because Master FE is not ready. You may need to check FE's status"));
                    }
                    forwardToMaster();
                    if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
                        context.setQueryId(masterOpExecutor.getQueryId());
                    }
                    return;
                }
            }

            // Query following createting table would throw table not exist error.
            // For example.
            // t1: client issues create table to master fe
            // t2: client issues query sql to observer fe, the query would fail due to not exist table in plan phase.
            // t3: observer fe receive editlog creating the table from the master fe
            syncJournalIfNeeded();
            try {
                ((Command) logicalPlan).verifyCommandSupported(context);
                ((Command) logicalPlan).run(context, this);
            } catch (MustFallbackException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Command({}) process failed.", originStmt.originStmt, e);
                }
                throw new NereidsException("Command(" + originStmt.originStmt + ") process failed.", e);
            } catch (QueryStateException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Command({}) process failed.", originStmt.originStmt, e);
                }
                context.setState(e.getQueryState());
                throw new NereidsException("Command(" + originStmt.originStmt + ") process failed",
                        new AnalysisException(e.getMessage(), e));
            } catch (UserException e) {
                // Return message to info client what happened.
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Command({}) process failed.", originStmt.originStmt, e);
                }
                if (Config.isCloudMode() && e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
                    throw e;
                }
                context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
                throw new NereidsException("Command (" + originStmt.originStmt + ") process failed",
                        new AnalysisException(e.getMessage(), e));
            } catch (Exception e) {
                // Maybe our bug
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Command({}) process failed.", originStmt.originStmt, e);
                }
                context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage());
                throw new NereidsException("Command (" + originStmt.originStmt + ") process failed.",
                        new AnalysisException(e.getMessage() == null ? e.toString() : e.getMessage(), e));
            }
        } else {
            context.getState().setIsQuery(true);
            if (isForwardToMaster()) {
                // some times the follower's meta data is out of date.
                // so we need forward the query to master until the meta data is sync with master
                if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
                    throw new UserException("Forward master command is not supported for prepare statement");
                }
                if (isProxy) {
                    // This is already a stmt forwarded from other FE.
                    // If we goes here, means we can't find a valid Master FE(some error happens).
                    // To avoid endless forward, throw exception here.
                    throw new NereidsException(new UserException("The statement has been forwarded to master FE("
                            + Env.getCurrentEnv().getSelfNode().getHost() + ") and failed to execute"
                            + " because Master FE is not ready. You may need to check FE's status"));
                }
                redirectStatus = RedirectStatus.NO_FORWARD;
                forwardToMaster();
                if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
                    context.setQueryId(masterOpExecutor.getQueryId());
                }
                return;
            }
            // create plan
            // Query following createting table would throw table not exist error.
            // For example.
            // t1: client issues create table to master fe
            // t2: client issues query sql to observer fe, the query would fail due to not exist table in
            //     plan phase.
            // t3: observer fe receive editlog creating the table from the master fe
            syncJournalIfNeeded();
            planner = new NereidsPlanner(statementContext);
            try {
                planner.plan(parsedStmt, context.getSessionVariable().toThrift());
                checkBlockRules();
            } catch (MustFallbackException e) {
                LOG.warn("Nereids plan query failed:\n{}", originStmt.originStmt, e);
                throw new NereidsException("Command(" + originStmt.originStmt + ") process failed.", e);
            } catch (Exception e) {
                LOG.warn("Nereids plan query failed:\n{}", originStmt.originStmt, e);
                throw new NereidsException(new AnalysisException(e.getMessage(), e));
            }
            profile.getSummaryProfile().setQueryPlanFinishTime();
            handleQueryWithRetry(queryId);
        }
    }

    public static void initBlockSqlAstNames() {
        blockSqlAstNames.clear();
        blockSqlAstNames = Pattern.compile(",")
                .splitAsStream(Config.block_sql_ast_names)
                .map(String::trim)
                .collect(Collectors.toSet());
        if (blockSqlAstNames.isEmpty() && !Config.block_sql_ast_names.isEmpty()) {
            blockSqlAstNames.add(Config.block_sql_ast_names);
        }
    }

    public void checkSqlBlocked(Class<?> clazz) throws UserException {
        if (blockSqlAstNames.contains(clazz.getSimpleName())) {
            throw new UserException("SQL is blocked with AST name: " + clazz.getSimpleName());
        }
    }

    private void parseByNereids() {
        if (parsedStmt != null) {
            return;
        }
        List<StatementBase> statements;
        try {
            getProfile().getSummaryProfile().setParseSqlStartTime(System.currentTimeMillis());
            statements = new NereidsParser().parseSQL(originStmt.originStmt, context.getSessionVariable());
            getProfile().getSummaryProfile().setParseSqlFinishTime(System.currentTimeMillis());
        } catch (Exception e) {
            throw new ParseException("Nereids parse failed. " + e.getMessage());
        }
        if (statements.isEmpty()) {
            // for test only
            parsedStmt = new LogicalPlanAdapter(new UnsupportedCommand(), new StatementContext());
        } else {
            if (statements.size() <= originStmt.idx) {
                throw new ParseException("Nereids parse failed. Parser get " + statements.size() + " statements,"
                        + " but we need at least " + originStmt.idx + " statements.");
            }
            parsedStmt = statements.get(originStmt.idx);
        }
    }

    public void finalizeQuery() {
        // The final profile report occurs after be returns the query data, and the profile cannot be
        // received after unregisterQuery(), causing the instance profile to be lost, so we should wait
        // for the profile before unregisterQuery().
        updateProfile(true);
        QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
    }

    private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
        // queue query here
        int retryTime = Config.max_query_retry_time;
        retryTime = retryTime <= 0 ? 1 : retryTime + 1;
        for (int i = 0; i < retryTime; i++) {
            try {
                // reset query id for each retry
                if (i > 0) {
                    UUID uuid = UUID.randomUUID();
                    TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(),
                            uuid.getLeastSignificantBits());
                    AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
                            DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
                    context.setQueryId(newQueryId);
                    context.setNeedRegenerateInstanceId(newQueryId);
                    if (Config.isCloudMode()) {
                        // sleep random millis [1000, 1500] ms
                        // in the begining of retryTime/2
                        int randomMillis = 1000 + (int) (Math.random() * (1000 - 500));
                        LOG.debug("stmt executor retry times {}, wait randomMillis:{}, stmt:{}",
                                i, randomMillis, originStmt.originStmt);
                        try {
                            if (i > retryTime / 2) {
                                // sleep random millis [2000, 2500] ms
                                // in the ending of retryTime/2
                                randomMillis = 2000 + (int) (Math.random() * (1000 - 500));
                            }
                            Thread.sleep(randomMillis);
                        } catch (InterruptedException e) {
                            LOG.info("stmt executor sleep wait InterruptedException: ", e);
                        }
                    }
                }
                if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
                    context.setReturnResultFromLocal(false);
                }
                handleQueryStmt();
                break;
            } catch (RpcException | UserException e) {
                if (Config.isCloudMode() && e.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
                    throw e;
                }
                // If the previous try is timeout or cancelled, then do not need try again.
                if (this.coord != null && (this.coord.isQueryCancelled() || this.coord.isTimeout())) {
                    throw e;
                }
                LOG.warn("due to exception {} retry {} rpc {} user {}",
                        e.getMessage(), i, e instanceof RpcException, e instanceof UserException);

                boolean isNeedRetry = false;
                if (Config.isCloudMode()) {
                    // cloud mode retry
                    isNeedRetry = false;
                    // errCode = 2, detailMessage = No backend available as scan node,
                    // please check the status of your backends. [10003: not alive]
                    List<String> bes = Env.getCurrentSystemInfo().getAllBackendIds().stream()
                                .map(id -> Long.toString(id)).collect(Collectors.toList());
                    String msg = e.getMessage();
                    if (e instanceof UserException
                            && msg.contains(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG)) {
                        Matcher matcher = beIpPattern.matcher(msg);
                        // here retry planner not be recreated, so
                        // in cloud mode drop node, be id invalid, so need not retry
                        // such as be ids [11000, 11001] -> after drop node 11001
                        // don't need to retry 11001's request
                        if (matcher.find()) {
                            String notAliveBe = matcher.group(1);
                            isNeedRetry = bes.contains(notAliveBe);
                            if (isNeedRetry) {
                                Backend abnormalBe = Env.getCurrentSystemInfo().getBackend(Long.parseLong(notAliveBe));
                                String deadCloudClusterStatus = abnormalBe.getCloudClusterStatus();
                                String deadCloudClusterClusterName = abnormalBe.getCloudClusterName();
                                LOG.info("need retry cluster {} status {}", deadCloudClusterClusterName,
                                        deadCloudClusterStatus);
                                if (Strings.isNullOrEmpty(deadCloudClusterStatus)
                                        || ClusterStatus.valueOf(deadCloudClusterStatus) != ClusterStatus.NORMAL) {
                                    ((CloudSystemInfoService) Env.getCurrentSystemInfo())
                                            .waitForAutoStart(deadCloudClusterClusterName);
                                }
                            }
                        }
                    }
                } else {
                    isNeedRetry = e instanceof RpcException;
                }
                if (i != retryTime - 1 && isNeedRetry
                        && context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) {
                    LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt);
                } else {
                    throw e;
                }
            } finally {
                if (context.isReturnResultFromLocal()) {
                    finalizeQuery();
                }
                LOG.debug("Finalize query {}", DebugUtil.printId(context.queryId()));
            }
        }
    }

    // Execute one statement with queryId
    // The queryId will be set in ConnectContext
    // This queryId will also be sent to master FE for exec master only query.
    // query id in ConnectContext will be changed when retry exec a query or master FE return a different one.
    // Exception:
    // IOException: talk with client failed.
    public void executeByLegacy(TUniqueId queryId) throws Exception {
        context.setStartTime();

        profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
        context.setQueryId(queryId);

        // set isQuery first otherwise this state will be lost if some error occurs
        if (parsedStmt instanceof QueryStmt) {
            context.getState().setIsQuery(true);
        }

        try {
            // parsedStmt maybe null here, we parse it. Or the predicate will not work.
            parseByLegacy();
            checkSqlBlocked(parsedStmt.getClass());
            if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt)
                    && !(parsedStmt instanceof TransactionStmt)) {
                throw new TException("This is in a transaction, only insert, update, delete, "
                        + "commit, rollback is acceptable.");
            }
            // support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3);
            analyzeVariablesInStmt();

            if (!context.isTxnModel()) {
                // analyze this query
                analyze(context.getSessionVariable().toThrift());

                if (isForwardToMaster()) {
                    // before forward to master, we also need to set profileType in this node
                    if (parsedStmt instanceof InsertStmt) {
                        InsertStmt insertStmt = (InsertStmt) parsedStmt;
                        if (!insertStmt.getQueryStmt().isExplain()) {
                            profileType = ProfileType.LOAD;
                        }
                    }
                    if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE
                                || context.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
                        throw new UserException("Forward master command is not supported for prepare statement");
                    }
                    if (isProxy) {
                        // This is already a stmt forwarded from other FE.
                        // If goes here, which means we can't find a valid Master FE(some error happens).
                        // To avoid endless forward, throw exception here.
                        throw new UserException("The statement has been forwarded to master FE("
                                + Env.getCurrentEnv().getSelfNode().getHost() + ") and failed to execute"
                                + " because Master FE is not ready. You may need to check FE's status");
                    }
                    forwardToMaster();
                    if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
                        context.setQueryId(masterOpExecutor.getQueryId());
                    }
                    return;
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
                    }
                }
            } else {
                // Query following createting table would throw table not exist error.
                // For example.
                // t1: client issues create table to master fe
                // t2: client issues query sql to observer fe, the query would fail due to not exist table
                //     in plan phase.
                // t3: observer fe receive editlog creating the table from the master fe
                syncJournalIfNeeded();
                analyzer = new Analyzer(context.getEnv(), context);
                parsedStmt.analyze(analyzer);
            }
            parsedStmt.checkPriv();
            // sql/sqlHash block
            checkBlockRules();
            if (parsedStmt instanceof QueryStmt) {
                handleQueryWithRetry(queryId);
            } else if (parsedStmt instanceof SetStmt) {
                handleSetStmt();
            } else if (parsedStmt instanceof UnsetVariableStmt) {
                handleUnsetVariableStmt();
            } else if (parsedStmt instanceof SwitchStmt) {
                handleSwitchStmt();
            } else if (parsedStmt instanceof UseStmt) {
                handleUseStmt();
            }  else if (parsedStmt instanceof UseCloudClusterStmt) {
                // jdbc client use
                handleUseCloudClusterStmt();
            } else if (parsedStmt instanceof TransactionStmt) {
                handleTransactionStmt();
            } else if (parsedStmt instanceof CreateTableAsSelectStmt) {
                handleCtasStmt();
            } else if (parsedStmt instanceof InsertOverwriteTableStmt) {
                handleIotStmt();
            } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InsertStmt is its subclass
                InsertStmt insertStmt = (InsertStmt) parsedStmt;
                if (insertStmt.needLoadManager()) {
                    // TODO(tsy): will eventually try to handle native insert and external insert together
                    // add a branch for external load
                    handleExternalInsertStmt();
                } else {
                    try {
                        if (!insertStmt.getQueryStmt().isExplain()) {
                            profileType = ProfileType.LOAD;
                        }
                        handleInsertStmt();
                    } catch (Throwable t) {
                        LOG.warn("handle insert stmt fail: {}", t.getMessage());
                        // the transaction of this insert may already begin, we will abort it at outer finally block.
                        throw t;
                    }
                }
            } else if (parsedStmt instanceof LoadStmt) {
                handleLoadStmt();
            } else if (parsedStmt instanceof UpdateStmt) {
                handleUpdateStmt();
            } else if (parsedStmt instanceof DdlStmt) {
                if (parsedStmt instanceof DeleteStmt) {
                    if (((DeleteStmt) parsedStmt).getInsertStmt() != null) {
                        handleDeleteStmt();
                    } else {
                        Env.getCurrentEnv()
                                .getDeleteHandler()
                                .process((DeleteStmt) parsedStmt, context.getState());
                    }
                } else {
                    handleDdlStmt();
                }
            } else if (parsedStmt instanceof ShowStmt) {
                handleShow();
            } else if (parsedStmt instanceof KillStmt) {
                handleKill();
            } else if (parsedStmt instanceof ExportStmt) {
                handleExportStmt();
            } else if (parsedStmt instanceof UnlockTablesStmt) {
                handleUnlockTablesStmt();
            } else if (parsedStmt instanceof LockTablesStmt) {
                handleLockTablesStmt();
            } else if (parsedStmt instanceof WarmUpClusterStmt) {
                handleWarmUpStmt();
            } else if (parsedStmt instanceof UnsupportedStmt) {
                handleUnsupportedStmt();
            } else if (parsedStmt instanceof AnalyzeStmt) {
                handleAnalyzeStmt();
            } else {
                context.getState().setError(ErrorCode.ERR_NOT_SUPPORTED_YET, "Do not support this query.");
            }
        } catch (IOException e) {
            LOG.warn("execute IOException. {}", context.getQueryIdentifier(), e);
            // the exception happens when interact with client
            // this exception shows the connection is gone
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage());
            throw e;
        } catch (UserException e) {
            // insert into select
            if (Config.isCloudMode() && e.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
                throw e;
            }
            // analysis exception only print message, not print the stack
            LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
        } catch (JdbcClientException e) {
            LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
                    e.getMessage());
        } catch (Exception e) {
            LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
                    e.getClass().getSimpleName() + ", msg: " + Util.getRootCauseWithSuppressedMessage(e));
            if (parsedStmt instanceof KillStmt) {
                // ignore kill stmt execute err(not monitor it)
                context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
            }
        } finally {
            if (!context.isTxnModel() && parsedStmt instanceof InsertStmt) {
                InsertStmt insertStmt = (InsertStmt) parsedStmt;
                // The transaction of an insert operation begin at analyze phase.
                // So we should abort the transaction at this finally block if it encounters exception.
                if (!insertStmt.needLoadManager() && insertStmt.isTransactionBegin()
                        && context.getState().getStateType() == MysqlStateType.ERR) {
                    try {
                        String errMsg = Strings.emptyToNull(context.getState().getErrorMessage());
                        Env.getCurrentGlobalTransactionMgr().abortTransaction(
                                insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
                                (errMsg == null ? "unknown reason" : errMsg));
                    } catch (Exception abortTxnException) {
                        LOG.warn("errors when abort txn. {}", context.getQueryIdentifier(), abortTxnException);
                    }
                }
            }
        }
    }

    private void syncJournalIfNeeded() throws Exception {
        final Env env = context.getEnv();
        if (env.isMaster() || !context.getSessionVariable().enableStrongConsistencyRead) {
            return;
        }
        new MasterOpExecutor(context).syncJournal();
    }

    /**
     * get variables in stmt.
     */
    private void analyzeVariablesInStmt() throws DdlException {
        analyzeVariablesInStmt(parsedStmt);
    }

    private void analyzeVariablesInStmt(StatementBase statement) throws DdlException {
        SessionVariable sessionVariable = context.getSessionVariable();
        if (statement instanceof SelectStmt) {
            SelectStmt selectStmt = (SelectStmt) statement;
            Map<String, String> optHints = selectStmt.getSelectList().getOptHints();
            if (optHints == null) {
                optHints = new HashMap<>();
            }
            if (optHints != null) {
                sessionVariable.setIsSingleSetVar(true);
                if (selectStmt.isFromInsert()) {
                    optHints.put("enable_page_cache", "false");
                }
                for (String key : optHints.keySet()) {
                    VariableMgr.setVar(sessionVariable, new SetVar(key, new StringLiteral(optHints.get(key))));
                }
            }
        }
    }

    private boolean isQuery() {
        return parsedStmt instanceof QueryStmt
                || (parsedStmt instanceof LogicalPlanAdapter
                && !(((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof Command));
    }

    public boolean isProfileSafeStmt() {
        // fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:131
        // Only generate profile for NereidsPlanner.
        if (!(parsedStmt instanceof LogicalPlanAdapter)) {
            return false;
        }

        LogicalPlan plan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();

        if (plan instanceof InsertIntoTableCommand) {
            LogicalPlan logicalPlan = ((InsertIntoTableCommand) plan).getLogicalQuery();
            // Do not generate profile for insert into t values xxx.
            // t could be an olap-table or an external-table.
            if ((logicalPlan instanceof UnboundTableSink) || (logicalPlan instanceof UnboundBaseExternalTableSink)) {
                if (logicalPlan.children() == null || logicalPlan.children().isEmpty()) {
                    return false;
                }

                for (Plan child : logicalPlan.children()) {
                    // InlineTable means insert into t VALUES xxx.
                    if (child instanceof InlineTable) {
                        return false;
                    }
                }
            }
            return true;
        }

        // Generate profile for:
        // 1. CreateTableCommand(mainly for create as select).
        // 2. LoadCommand.
        // 3. InsertOverwriteTableCommand.
        if ((plan instanceof Command) && !(plan instanceof LoadCommand)
                && !(plan instanceof CreateTableCommand) && !(plan instanceof InsertOverwriteTableCommand)) {
            // Commands like SHOW QUERY PROFILE will not have profile.
            return false;
        } else {
            // 4. For all the other statements.
            return true;
        }
    }

    private void forwardToMaster() throws Exception {
        masterOpExecutor = new MasterOpExecutor(originStmt, context, redirectStatus, isQuery());
        if (LOG.isDebugEnabled()) {
            LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());
        }
        masterOpExecutor.execute();
        if (parsedStmt instanceof LogicalPlanAdapter) {
            // for nereids command
            if (((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof Forward) {
                Forward forward = (Forward) ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
                forward.afterForwardToMaster(context);
            }
        } else if (parsedStmt instanceof SetStmt) {
            SetStmt setStmt = (SetStmt) parsedStmt;
            setStmt.modifySetVarsForExecute();
            for (SetVar var : setStmt.getSetVars()) {
                VariableMgr.setVarForNonMasterFE(context.getSessionVariable(), var);
            }
        } else if (parsedStmt instanceof UnsetVariableStmt) {
            UnsetVariableStmt unsetStmt = (UnsetVariableStmt) parsedStmt;
            if (unsetStmt.isApplyToAll()) {
                VariableMgr.setAllVarsToDefaultValue(context.getSessionVariable(), SetType.SESSION);
            } else {
                String defaultValue = VariableMgr.getDefaultValue(unsetStmt.getVariable());
                if (defaultValue == null) {
                    ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_SYSTEM_VARIABLE, unsetStmt.getVariable());
                }
                SetVar var = new SetVar(SetType.SESSION, unsetStmt.getVariable(),
                        new StringLiteral(defaultValue), SetVarType.SET_SESSION_VAR);
                VariableMgr.setVar(context.getSessionVariable(), var);
            }
        }
    }

    public void updateProfile(boolean isFinished) {
        if (!context.getSessionVariable().enableProfile() || !isProfileSafeStmt()) {
            return;
        }
        // If any error happened in update profile, we should ignore this error
        // and ensure the sql is finished normally. For example, if update profile
        // failed, the insert stmt should be success
        try {
            profile.updateSummary(getSummaryInfo(isFinished), isFinished, this.planner);
            if (planner instanceof NereidsPlanner) {
                NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
                profile.setPhysicalPlan(nereidsPlanner.getPhysicalPlan());
            }
        } catch (Throwable t) {
            LOG.warn("failed to update profile, ignore this error", t);
        }
    }

    private boolean hasCloudClusterPriv() {
        String clusterName = "";
        try {
            clusterName = ConnectContext.get().getCloudCluster();
        } catch (ComputeGroupException e) {
            LOG.warn("failed to get cloud cluster", e);
            return false;
        }
        if (ConnectContext.get() == null || Strings.isNullOrEmpty(clusterName)) {
            return false;
        }
        return Env.getCurrentEnv().getAccessManager().checkCloudPriv(ConnectContext.get().getCurrentUserIdentity(),
            clusterName, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER);
    }

    // Analyze one statement to structure in memory.
    public void analyze(TQueryOptions tQueryOptions) throws UserException, InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(),
                    context.getForwardedStmtId());
        }

        parseByLegacy();

        // yiguolei: insert stmt's grammar analysis will write editlog,
        // so that we check if the stmt should be forward to master here
        // if the stmt should be forward to master, then just return here and the master will do analysis again
        if (isForwardToMaster()) {
            return;
        }

        analyzer = new Analyzer(context.getEnv(), context);

        // Convert show statement to select statement here
        if (parsedStmt instanceof ShowStmt) {
            SelectStmt selectStmt = ((ShowStmt) parsedStmt).toSelectStmt(analyzer);
            if (selectStmt != null) {
                // Need to set origin stmt for new "parsedStmt"(which is selectStmt here)
                // Otherwise, the log printing may result in NPE
                selectStmt.setOrigStmt(parsedStmt.getOrigStmt());
                setParsedStmt(selectStmt);
            }
        }

        // convert unified load stmt here
        if (parsedStmt instanceof UnifiedLoadStmt) {
            // glue code for unified load
            final UnifiedLoadStmt unifiedLoadStmt = (UnifiedLoadStmt) parsedStmt;
            unifiedLoadStmt.init();
            final StatementBase proxyStmt = unifiedLoadStmt.getProxyStmt();
            parsedStmt = proxyStmt;
            if (!(proxyStmt instanceof LoadStmt) && !(proxyStmt instanceof CreateRoutineLoadStmt)) {
                Preconditions.checkState(
                        parsedStmt instanceof InsertStmt,
                        "enable_unified_load=true, should be insert stmt");
            }
        }
        if (parsedStmt instanceof QueryStmt
                || (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager())
                || parsedStmt instanceof CreateTableAsSelectStmt
                || parsedStmt instanceof InsertOverwriteTableStmt) {
            Map<Long, TableIf> tableMap = Maps.newTreeMap();
            QueryStmt queryStmt;
            Set<String> parentViewNameSet = Sets.newHashSet();
            if (parsedStmt instanceof QueryStmt) {
                queryStmt = (QueryStmt) parsedStmt;
                queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
            } else if (parsedStmt instanceof InsertOverwriteTableStmt) {
                InsertOverwriteTableStmt parsedStmt = (InsertOverwriteTableStmt) this.parsedStmt;
                parsedStmt.analyze(analyzer);
                queryStmt = parsedStmt.getQueryStmt();
                queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
            } else if (parsedStmt instanceof CreateTableAsSelectStmt) {
                CreateTableAsSelectStmt parsedStmt = (CreateTableAsSelectStmt) this.parsedStmt;
                queryStmt = parsedStmt.getQueryStmt();
                queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
            } else if (parsedStmt instanceof InsertStmt) {
                InsertStmt insertStmt = (InsertStmt) parsedStmt;
                insertStmt.getTables(analyzer, tableMap, parentViewNameSet);
            }
            // table id in tableList is in ascending order because that table map is a sorted map
            List<TableIf> tables = Lists.newArrayList(tableMap.values());
            tables.sort((Comparator.comparing(TableIf::getId)));
            int analyzeTimes = 2;
            if (Config.isCloudMode()) {
                // be core and be restarted, need retry more times
                analyzeTimes = Math.max(Config.max_query_retry_time / 2, 2);
            }
            for (int i = 1; i <= analyzeTimes; i++) {
                MetaLockUtils.readLockTables(tables);
                try {
                    analyzeAndGenerateQueryPlan(tQueryOptions);
                    break;
                } catch (MVSelectFailedException e) {
                    /*
                     * If there is MVSelectFailedException after the first planner,
                     * there will be error mv rewritten in query.
                     * So, the query should be reanalyzed without mv rewritten and planner again.
                     * Attention: Only error rewritten tuple is forbidden to mv rewrite in the second time.
                     */
                    if (i == analyzeTimes) {
                        throw e;
                    } else {
                        resetAnalyzerAndStmt();
                    }
                } catch (UserException e) {
                    // cloud mode retry, when retry need check this user has cloud cluster auth.
                    // if user doesn't have cloud cluster auth, don't retry, just return.
                    if (Config.isCloudMode()
                            && (e.getMessage().contains(SystemInfoService.NOT_USING_VALID_CLUSTER_MSG)
                            || e.getMessage().contains("backend -1"))
                            && hasCloudClusterPriv()) {
                        LOG.debug("cloud mode analyzeAndGenerateQueryPlan retry times {}", i);
                        // sleep random millis [500, 1000] ms
                        int randomMillis = 500 + (int) (Math.random() * (1000 - 500));
                        try {
                            if (i > analyzeTimes / 2) {
                                randomMillis = 1000 + (int) (Math.random() * (1000 - 500));
                            }
                            Thread.sleep(randomMillis);
                        } catch (InterruptedException ie) {
                            LOG.info("stmt executor sleep wait InterruptedException: ", ie);
                        }
                        if (i < analyzeTimes) {
                            continue;
                        }
                    }
                    throw e;
                } catch (Exception e) {
                    LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
                    throw new AnalysisException("Unexpected exception: " + e.getMessage());
                } finally {
                    MetaLockUtils.readUnlockTables(tables);
                }
            }
        } else {
            try {
                parsedStmt.analyze(analyzer);
            } catch (UserException e) {
                throw e;
            } catch (Exception e) {
                LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
                throw new AnalysisException("Unexpected exception: " + e.getMessage());
            }
        }
    }

    private void parseByLegacy() throws AnalysisException, DdlException {
        // parsedStmt may already by set when constructing this StmtExecutor();
        if (parsedStmt == null) {
            // Parse statement with parser generated by CUP&FLEX
            SqlScanner input = new SqlScanner(new StringReader(originStmt.originStmt),
                    context.getSessionVariable().getSqlMode());
            SqlParser parser = new SqlParser(input);
            try {
                StatementBase parsedStmt = setParsedStmt(SqlParserUtils.getStmt(parser, originStmt.idx));
                parsedStmt.setOrigStmt(originStmt);
                parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            } catch (Error e) {
                LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e);
                throw new AnalysisException("sql parsing error, please check your sql");
            } catch (AnalysisException e) {
                String syntaxError = parser.getErrorMsg(originStmt.originStmt);
                LOG.info("analysis exception happened when parsing stmt {}, id: {}, error: {}",
                        originStmt, context.getStmtId(), syntaxError, e);
                if (syntaxError == null) {
                    throw e;
                } else {
                    throw new AnalysisException(syntaxError, e);
                }
            } catch (Exception e) {
                // TODO(lingbin): we catch 'Exception' to prevent unexpected error,
                // should be removed this try-catch clause future.
                LOG.info("unexpected exception happened when parsing stmt {}, id: {}, error: {}",
                        originStmt, context.getStmtId(), parser.getErrorMsg(originStmt.originStmt), e);
                throw new AnalysisException("Unexpected exception: " + e.getMessage());
            }

            analyzeVariablesInStmt();
        }
        if (context.getSessionVariable().isEnableInsertGroupCommit() && parsedStmt instanceof NativeInsertStmt) {
            NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
            nativeInsertStmt.analyzeGroupCommit(new Analyzer(context.getEnv(), context));
        }
        redirectStatus = parsedStmt.getRedirectStatus();
    }

    private void analyzeAndGenerateQueryPlan(TQueryOptions tQueryOptions) throws UserException {
        if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {
            QueryStmt queryStmt = null;
            if (parsedStmt instanceof QueryStmt) {
                queryStmt = (QueryStmt) parsedStmt;
            }
            if (parsedStmt instanceof InsertStmt) {
                queryStmt = (QueryStmt) ((InsertStmt) parsedStmt).getQueryStmt();
            }
            if (queryStmt.getOrderByElements() != null && queryStmt.getOrderByElements().isEmpty()) {
                queryStmt.removeOrderByElements();
            }
        }
        parsedStmt.analyze(analyzer);
        if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {
            if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).isGroupCommit()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("skip generate query plan for group commit insert");
                }
                return;
            }
            ExprRewriter rewriter = analyzer.getExprRewriter();
            rewriter.reset();
            if (context.getSessionVariable().isEnableFoldConstantByBe()
                    && !context.getSessionVariable().isDebugSkipFoldConstant()) {
                // fold constant expr
                parsedStmt.foldConstant(rewriter, tQueryOptions);
            }
            if (context.getSessionVariable().isEnableRewriteElementAtToSlot()) {
                parsedStmt.rewriteElementAtToSlot(rewriter, tQueryOptions);
            }
            // Apply expr and subquery rewrites.
            ExplainOptions explainOptions = parsedStmt.getExplainOptions();
            boolean reAnalyze = false;

            parsedStmt.rewriteExprs(rewriter);
            reAnalyze = rewriter.changed();
            if (analyzer.containSubquery()) {
                parsedStmt = setParsedStmt(StmtRewriter.rewrite(analyzer, parsedStmt));
                reAnalyze = true;
            }
            if (parsedStmt instanceof SelectStmt) {
                if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)
                        || StmtRewriter.rewriteForRandomDistribution(parsedStmt, analyzer)) {
                    reAnalyze = true;
                }
            }
            if (parsedStmt instanceof SetOperationStmt) {
                List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
                for (SetOperationStmt.SetOperand operand : operands) {
                    if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)
                            || StmtRewriter.rewriteForRandomDistribution(operand.getQueryStmt(), analyzer)) {
                        reAnalyze = true;
                    }
                }
            }
            if (parsedStmt instanceof InsertStmt) {
                QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
                if (queryStmt != null && (StmtRewriter.rewriteByPolicy(queryStmt, analyzer)
                        || StmtRewriter.rewriteForRandomDistribution(queryStmt, analyzer))) {
                    reAnalyze = true;
                }
            }
            if (reAnalyze) {
                // The rewrites should have no user-visible effect. Remember the original result
                // types and column labels to restore them after the rewritten stmt has been
                // reset() and re-analyzed.
                List<Type> origResultTypes = Lists.newArrayList();
                for (Expr e : parsedStmt.getResultExprs()) {
                    origResultTypes.add(e.getType());
                }
                List<String> origColLabels =
                        Lists.newArrayList(parsedStmt.getColLabels());
                // Re-analyze the stmt with a new analyzer.
                analyzer = new Analyzer(context.getEnv(), context);
                // query re-analyze
                parsedStmt.reset();

                analyzer.setReAnalyze(true);
                parsedStmt.analyze(analyzer);

                // Restore the original result types and column labels.
                parsedStmt.castResultExprs(origResultTypes);
                parsedStmt.setColLabels(origColLabels);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("rewrittenStmt: " + parsedStmt.toSql());
                }
                if (explainOptions != null) {
                    parsedStmt.setIsExplain(explainOptions);
                }
            }
        }
        profile.getSummaryProfile().setQueryPlanFinishTime();
    }

    private void resetAnalyzerAndStmt() {
        analyzer = new Analyzer(context.getEnv(), context);

        parsedStmt.reset();

        // DORIS-7361
        // Need to reset selectList before second-round analyze,
        // because exprs in selectList could be rewritten by mvExprRewriter
        // in first-round analyze, which could cause analyze failure.
        if (parsedStmt instanceof QueryStmt) {
            ((QueryStmt) parsedStmt).resetSelectList();
        }

        if (parsedStmt instanceof InsertStmt) {
            ((InsertStmt) parsedStmt).getQueryStmt().resetSelectList();
        }

        if (parsedStmt instanceof CreateTableAsSelectStmt) {
            ((CreateTableAsSelectStmt) parsedStmt).getQueryStmt().resetSelectList();
        }
    }

    // Because this is called by other thread
    public void cancel(Status cancelReason, boolean needWaitCancelComplete) {
        if (masterOpExecutor != null) {
            try {
                masterOpExecutor.cancel();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            return;
        }
        Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
        if (insertOverwriteTableCommand.isPresent()) {
            // If the be scheduling has not been triggered yet, cancel the scheduling first
            insertOverwriteTableCommand.get().cancel();
        }
        Coordinator coordRef = coord;
        if (coordRef != null) {
            coordRef.cancel(cancelReason);
        }
        if (mysqlLoadId != null) {
            Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
        }
        if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
            Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
        }
        if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) {
            // Wait for the command to run or cancel completion
            insertOverwriteTableCommand.get().waitNotRunning();
        }
    }

    public void cancel(Status cancelReason) {
        cancel(cancelReason, true);
    }

    private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
        if (parsedStmt instanceof LogicalPlanAdapter) {
            LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
            LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
            if (logicalPlan instanceof InsertOverwriteTableCommand) {
                InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan;
                return Optional.of(insertOverwriteTableCommand);
            }
        }
        return Optional.empty();
    }

    // Handle kill statement.
    private void handleKill() throws UserException {
        KillStmt killStmt = (KillStmt) parsedStmt;
        String queryId = killStmt.getQueryId();
        int id = killStmt.getConnectionId();
        KillUtils.kill(context, killStmt.isConnectionKill(), queryId, id, parsedStmt.getOrigStmt());
    }

    // Process set statement.
    private void handleSetStmt() {
        try {
            SetStmt setStmt = (SetStmt) parsedStmt;
            SetExecutor executor = new SetExecutor(context, setStmt);
            executor.execute();
        } catch (DdlException e) {
            LOG.warn("", e);
            // Return error message to client.
            context.getState().setError(ErrorCode.ERR_LOCAL_VARIABLE, e.getMessage());
            return;
        }
        context.getState().setOk();
    }

    // Process unset variable statement.
    private void handleUnsetVariableStmt() {
        try {
            UnsetVariableStmt unsetStmt = (UnsetVariableStmt) parsedStmt;
            if (unsetStmt.isApplyToAll()) {
                VariableMgr.setAllVarsToDefaultValue(context.getSessionVariable(), unsetStmt.getSetType());
            } else {
                String defaultValue = VariableMgr.getDefaultValue(unsetStmt.getVariable());
                if (defaultValue == null) {
                    ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_SYSTEM_VARIABLE, unsetStmt.getVariable());
                }
                SetVar var = new SetVar(unsetStmt.getSetType(), unsetStmt.getVariable(),
                        new StringLiteral(defaultValue), SetVarType.SET_SESSION_VAR);
                VariableMgr.setVar(context.getSessionVariable(), var);
            }
        } catch (DdlException e) {
            LOG.warn("", e);
            // Return error message to client.
            context.getState().setError(ErrorCode.ERR_LOCAL_VARIABLE, e.getMessage());
            return;
        }
        context.getState().setOk();
    }

    // send values from cache.
    // return true if the meta fields has been sent, otherwise, return false.
    // the meta fields must be sent right before the first batch of data(or eos flag).
    // so if it has data(or eos is true), this method must return true.
    private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCacheValue> cacheValues,
            Queriable selectStmt, boolean isSendFields, boolean isEos)
            throws Exception {
        RowBatch batch = null;
        boolean isSend = isSendFields;
        for (InternalService.PCacheValue value : cacheValues) {
            TResultBatch resultBatch = new TResultBatch();
            // need to set empty list first, to support empty result set.
            resultBatch.setRows(Lists.newArrayList());
            for (ByteString one : value.getRowsList()) {
                resultBatch.addToRows(ByteBuffer.wrap(one.toByteArray()));
            }
            resultBatch.setPacketSeq(1);
            resultBatch.setIsCompressed(false);
            batch = new RowBatch();
            batch.setBatch(resultBatch);
            batch.setEos(true);
            if (!isSend) {
                // send meta fields before sending first data batch.
                sendFields(selectStmt.getColLabels(), selectStmt.getFieldInfos(),
                        exprToType(selectStmt.getResultExprs()));
                isSend = true;
            }
            for (ByteBuffer row : batch.getBatch().getRows()) {
                channel.sendOnePacket(row);
            }
            context.updateReturnRows(batch.getBatch().getRows().size());
        }

        if (isEos) {
            if (batch != null) {
                statisticsForAuditLog = batch.getQueryStatistics() == null
                        ? null : batch.getQueryStatistics().toBuilder();
            }
            if (!isSend) {
                sendFields(selectStmt.getColLabels(), selectStmt.getFieldInfos(),
                        exprToType(selectStmt.getResultExprs()));
                isSend = true;
            }
            context.getState().setEof();
        }
        return isSend;
    }

    /**
     * Handle the SelectStmt via Cache.
     */
    private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel) throws Exception {
        InternalService.PFetchCacheResult cacheResult = null;
        boolean wantToParseSqlForSqlCache = planner instanceof NereidsPlanner
                && CacheAnalyzer.canUseSqlCache(context.getSessionVariable());
        try {
            cacheResult = cacheAnalyzer.getCacheData();
            if (cacheResult == null) {
                if (ConnectContext.get() != null
                        && !ConnectContext.get().getSessionVariable().testQueryCacheHit.equals("none")) {
                    throw new UserException("The variable test_query_cache_hit is set to "
                            + ConnectContext.get().getSessionVariable().testQueryCacheHit
                            + ", but the query cache is not hit.");
                }
            }
        } finally {
            if (wantToParseSqlForSqlCache) {
                String originStmt = parsedStmt.getOrigStmt().originStmt;
                NereidsSqlCacheManager sqlCacheManager = context.getEnv().getSqlCacheManager();
                if (cacheResult != null) {
                    sqlCacheManager.tryAddBeCache(context, originStmt, cacheAnalyzer);
                }
            }
        }

        Queriable queryStmt = (Queriable) parsedStmt;
        boolean isSendFields = false;
        if (cacheResult != null) {
            isCached = true;
            if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) {
                sendCachedValues(channel, cacheResult.getValuesList(), queryStmt, isSendFields, true);
                return;
            }
        }
        executeAndSendResult(false, isSendFields, queryStmt, channel, cacheAnalyzer, cacheResult);
    }

    // Process a select statement.
    private void handleQueryStmt() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling query {} with query id {}",
                          originStmt.originStmt, DebugUtil.printId(context.queryId));
        }

        if (context.getConnectType() == ConnectType.MYSQL) {
            // Every time set no send flag and clean all data in buffer
            context.getMysqlChannel().reset();
        }

        Queriable queryStmt = (Queriable) parsedStmt;

        if (queryStmt.isExplain()) {
            String explainString = planner.getExplainString(queryStmt.getExplainOptions());
            handleExplainStmt(explainString, false);
            LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
            return;
        }

        if (parsedStmt instanceof LogicalPlanAdapter) {
            LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
            LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
            if (logicalPlan instanceof org.apache.doris.nereids.trees.plans.algebra.SqlCache) {
                isCached = true;
            }
        }

        // handle selects that fe can do without be, so we can make sql tools happy, especially the setup step.
        // TODO FE not support doris field type conversion to arrow field type.
        if (context.supportHandleByFe()) {
            Optional<ResultSet> resultSet = planner.handleQueryInFe(parsedStmt);
            if (resultSet.isPresent()) {
                sendResultSet(resultSet.get(), ((Queriable) parsedStmt).getFieldInfos());
                isHandleQueryInFe = true;
                LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
                if (context.getSessionVariable().enableProfile()) {
                    if (profile != null) {
                        this.profile.getSummaryProfile().setExecutedByFrontend(true);
                    }
                }
                return;
            }
        }

        MysqlChannel channel = null;
        if (context.getConnectType().equals(ConnectType.MYSQL)) {
            channel = context.getMysqlChannel();
        }
        boolean isOutfileQuery = queryStmt.hasOutFileClause();
        if (parsedStmt instanceof LogicalPlanAdapter) {
            LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
            LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
            if (logicalPlan instanceof org.apache.doris.nereids.trees.plans.algebra.SqlCache) {
                NereidsPlanner nereidsPlanner = (NereidsPlanner) planner;
                PhysicalSqlCache physicalSqlCache = (PhysicalSqlCache) nereidsPlanner.getPhysicalPlan();
                sendCachedValues(channel, physicalSqlCache.getCacheValues(), logicalPlanAdapter, false, true);
                return;
            }
        }

        // Sql and PartitionCache
        CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
        // TODO support arrow flight sql
        // NOTE: If you want to add another condition about SessionVariable, please consider whether
        // add to CacheAnalyzer.commonCacheCondition
        if (channel != null && !isOutfileQuery && CacheAnalyzer.canUseCache(context.getSessionVariable())
                && parsedStmt.getOrigStmt() != null && parsedStmt.getOrigStmt().originStmt != null) {
            if (queryStmt instanceof QueryStmt || queryStmt instanceof LogicalPlanAdapter) {
                handleCacheStmt(cacheAnalyzer, channel);
                LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
                return;
            }
        }

        // handle select .. from xx  limit 0
        // TODO support arrow flight sql
        if (channel != null && parsedStmt instanceof SelectStmt) {
            SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
            if (parsedSelectStmt.getLimit() == 0) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql());
                }

                sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(), exprToType(queryStmt.getResultExprs()));
                context.getState().setEof();
                LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
                return;
            }
        }

        executeAndSendResult(isOutfileQuery, false, queryStmt, channel, null, null);
        LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
    }

    public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields,
            Queriable queryStmt, MysqlChannel channel,
            CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception {
        // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
        //    We will not send real query result to client. Instead, we only send OK to client with
        //    number of rows selected. For example:
        //          mysql> select * from tbl1 into outfile xxx;
        //          Query OK, 10 rows affected (0.01 sec)
        //
        // 2. If this is a query, send the result expr fields first, and send result data back to client.
        RowBatch batch;
        CoordInterface coordBase = null;
        if (statementContext.isShortCircuitQuery()) {
            ShortCircuitQueryContext shortCircuitQueryContext =
                        statementContext.getShortCircuitQueryContext() != null
                                ? statementContext.getShortCircuitQueryContext()
                                : new ShortCircuitQueryContext(planner, (Queriable) parsedStmt);
            coordBase = new PointQueryExecutor(shortCircuitQueryContext,
                        context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
            context.getState().setIsQuery(true);
        } else if (planner instanceof NereidsPlanner && ((NereidsPlanner) planner).getDistributedPlans() != null) {
            coord = new NereidsCoordinator(context, analyzer,
                    (NereidsPlanner) planner, context.getStatsErrorEstimator());
            profile.addExecutionProfile(coord.getExecutionProfile());
            QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                    new QueryInfo(context, originStmt.originStmt, coord));
            coordBase = coord;
        } else {
            coord = EnvFactory.getInstance().createCoordinator(
                    context, analyzer, planner, context.getStatsErrorEstimator());
            profile.addExecutionProfile(coord.getExecutionProfile());
            QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                    new QueryInfo(context, originStmt.originStmt, coord));
            coordBase = coord;
        }

        coordBase.setIsProfileSafeStmt(this.isProfileSafeStmt());

        try {
            coordBase.exec();
            profile.getSummaryProfile().setQueryScheduleFinishTime();
            updateProfile(false);

            if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
                Preconditions.checkState(!context.isReturnResultFromLocal());
                profile.getSummaryProfile().setTempStartTime();
                return;
            }

            if (context.isRunProcedure()) {
                // plsql will get the returned results without sending them to mysql client.
                // see org/apache/doris/plsql/executor/DorisRowResult.java
                return;
            }

            boolean isDryRun = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery;
            while (true) {
                // register the fetch result time.
                profile.getSummaryProfile().setTempStartTime();
                batch = coordBase.getNext();
                profile.getSummaryProfile().freshFetchResultConsumeTime();

                // for outfile query, there will be only one empty batch send back with eos flag
                // call `copyRowBatch()` first, because batch.getBatch() may be null, if result set is empty
                if (cacheAnalyzer != null && !isOutfileQuery && !isDryRun) {
                    cacheAnalyzer.copyRowBatch(batch);
                }
                if (batch.getBatch() != null) {
                    // register send field result time.
                    profile.getSummaryProfile().setTempStartTime();
                    // For some language driver, getting error packet after fields packet
                    // will be recognized as a success result
                    // so We need to send fields after first batch arrived
                    if (!isSendFields) {
                        if (!isOutfileQuery) {
                            sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(),
                                    getReturnTypes(queryStmt));
                        } else {
                            if (!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
                                outfileWriteSuccess(queryStmt.getOutFileClause());
                            }
                            sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
                        }
                        isSendFields = true;
                    }
                    for (ByteBuffer row : batch.getBatch().getRows()) {
                        channel.sendOnePacket(row);
                    }
                    profile.getSummaryProfile().freshWriteResultConsumeTime();
                    context.updateReturnRows(batch.getBatch().getRows().size());
                    context.addResultAttachedInfo(batch.getBatch().getAttachedInfos());
                }
                if (batch.isEos()) {
                    break;
                }
            }
            if (cacheAnalyzer != null && !isDryRun) {
                if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
                    isSendFields =
                            sendCachedValues(channel, cacheResult.getValuesList(), queryStmt, isSendFields,
                                    false);
                }

                cacheAnalyzer.updateCache();

                Cache cache = cacheAnalyzer.getCache();
                if (cache instanceof SqlCache && !cache.isDisableCache() && planner instanceof NereidsPlanner) {
                    String originStmt = parsedStmt.getOrigStmt().originStmt;
                    context.getEnv().getSqlCacheManager().tryAddBeCache(context, originStmt, cacheAnalyzer);
                }
            }
            if (!isSendFields) {
                if (!isOutfileQuery) {
                    if (ConnectContext.get() != null && isDryRun) {
                        // Return a one row one column result set, with the real result number
                        long rows = 0;
                        if (coordBase instanceof Coordinator) {
                            rows = ((Coordinator) coordBase).getNumReceivedRows();
                        } else if (batch.getQueryStatistics() != null) {
                            rows = batch.getQueryStatistics().getReturnedRows();
                        }
                        List<String> data = Lists.newArrayList(String.valueOf(rows));
                        ResultSet resultSet = new CommonResultSet(DRY_RUN_QUERY_METADATA,
                                Collections.singletonList(data));
                        sendResultSet(resultSet);
                        return;
                    } else {
                        sendFields(queryStmt.getColLabels(), queryStmt.getFieldInfos(),
                                getReturnTypes(queryStmt));
                    }
                } else {
                    sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES);
                }
            }

            statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
            context.getState().setEof();
            profile.getSummaryProfile().setQueryFetchResultFinishTime();
        } catch (Exception e) {
            // notify all be cancel running fragment
            // in some case may block all fragment handle threads
            // details see issue https://github.com/apache/doris/issues/16203
            Status internalErrorSt = new Status(TStatusCode.INTERNAL_ERROR,
                    "cancel fragment query_id:{} cause {}",
                    DebugUtil.printId(context.queryId()), e.getMessage());
            LOG.warn(internalErrorSt.getErrorMsg());
            coordBase.cancel(internalErrorSt);
            throw e;
        } finally {
            coordBase.close();
        }
    }

    private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception {
        // 1. set TResultFileSinkOptions
        TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();

        // 2. set brokerNetAddress
        StorageType storageType = outFileClause.getBrokerDesc() == null
                ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType();
        if (storageType == StorageType.BROKER) {
            // set the broker address for OUTFILE sink
            String brokerName = outFileClause.getBrokerDesc().getName();
            FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName);
            sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port)));
        }

        // 3. set TResultFileSink properties
        TResultFileSink sink = new TResultFileSink();
        sink.setFileOptions(sinkOptions);
        sink.setStorageBackendType(storageType.toThrift());

        // 4. get BE
        TNetworkAddress address = null;
        for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) {
            if (be.isAlive()) {
                address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
                break;
            }
        }
        if (address == null) {
            throw new AnalysisException("No Alive backends");
        }

        // 5. send rpc to BE
        POutfileWriteSuccessRequest request = POutfileWriteSuccessRequest.newBuilder()
                .setResultFileSink(ByteString.copyFrom(new TSerializer().serialize(sink))).build();
        Future<POutfileWriteSuccessResult> future = BackendServiceProxy.getInstance()
                .outfileWriteSuccessAsync(address, request);
        POutfileWriteSuccessResult result = future.get();
        TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
        String errMsg;
        if (code != TStatusCode.OK) {
            if (!result.getStatus().getErrorMsgsList().isEmpty()) {
                errMsg = result.getStatus().getErrorMsgsList().get(0);
            } else {
                errMsg = "Outfile write success file failed. backend address: "
                        + NetUtils
                        .getHostPortInAccessibleFormat(address.getHostname(), address.getPort());
            }
            throw new AnalysisException(errMsg);
        }
    }

    private void handleTransactionStmt() throws Exception {
        if (context.getConnectType() == ConnectType.MYSQL) {
            // Every time set no send flag and clean all data in buffer
            context.getMysqlChannel().reset();
        }
        context.getState().setOk(0, 0, "");
        // create plan
        if (context.getTxnEntry() != null && context.getTxnEntry().getRowsInTransaction() == 0
                && !context.getTxnEntry().isTransactionBegan()
                && (parsedStmt instanceof TransactionCommitStmt || parsedStmt instanceof TransactionRollbackStmt)) {
            context.setTxnEntry(null);
        } else if (parsedStmt instanceof TransactionBeginStmt) {
            if (context.isTxnModel()) {
                LOG.info("A transaction has already begin");
                return;
            }
            if (context.getTxnEntry() == null) {
                context.setTxnEntry(new TransactionEntry());
            }
            context.getTxnEntry()
                    .setTxnConf(new TTxnParams().setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("")
                            .setTbl("").setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
                                    : context.getSessionVariable().getInsertMaxFilterRatio()));
            context.getTxnEntry().setFirstTxnInsert(true);
            StringBuilder sb = new StringBuilder();
            sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'")
                    .append(TransactionStatus.PREPARE.name());
            sb.append("', 'txnId':'").append("'").append("}");
            context.getState().setOk(0, 0, sb.toString());
        } else if (parsedStmt instanceof TransactionCommitStmt) {
            if (!context.isTxnModel()) {
                LOG.info("No transaction to commit");
                return;
            }
            try {
                TransactionEntry txnEntry = context.getTxnEntry();
                TransactionStatus txnStatus = txnEntry.commitTransaction();
                StringBuilder sb = new StringBuilder();
                sb.append("{'label':'").append(txnEntry.getLabel()).append("', 'status':'")
                        .append(txnStatus.name()).append("', 'txnId':'")
                        .append(txnEntry.getTransactionId()).append("'").append("}");
                context.getState().setOk(0, 0, sb.toString());
            } catch (Exception e) {
                LOG.warn("Txn commit failed", e);
                throw new AnalysisException(e.getMessage());
            } finally {
                context.setTxnEntry(null);
            }
        } else if (parsedStmt instanceof TransactionRollbackStmt) {
            if (!context.isTxnModel()) {
                LOG.info("No transaction to rollback");
                return;
            }
            try {
                TransactionEntry txnEntry = context.getTxnEntry();
                long txnId = txnEntry.abortTransaction();
                StringBuilder sb = new StringBuilder();
                sb.append("{'label':'").append(txnEntry.getLabel()).append("', 'status':'")
                        .append(TransactionStatus.ABORTED.name()).append("', 'txnId':'")
                        .append(txnId).append("'").append("}");
                context.getState().setOk(0, 0, sb.toString());
            } catch (Exception e) {
                throw new AnalysisException(e.getMessage());
            } finally {
                context.setTxnEntry(null);
            }
        } else {
            throw new TException("parsedStmt type is not TransactionStmt");
        }
    }

    private int executeForTxn(InsertStmt insertStmt)
            throws UserException, TException, InterruptedException, ExecutionException, TimeoutException {
        if (context.isInsertValuesTxnIniting()) { // first time, begin txn
            beginTxn(insertStmt.getDbName(),
                    insertStmt.getTbl());
        }
        if (!context.getTxnEntry().getTxnConf().getDb().equals(insertStmt.getDbName())
                || !context.getTxnEntry().getTxnConf().getTbl().equals(insertStmt.getTbl())) {
            throw new TException("Only one table can be inserted in one transaction.");
        }

        QueryStmt queryStmt = insertStmt.getQueryStmt();
        if (!(queryStmt instanceof SelectStmt)) {
            throw new TException("queryStmt is not SelectStmt, insert command error");
        }
        if (((NativeInsertStmt) insertStmt).getTargetColumnNames() != null) {
            throw new TException(
                    "The legacy planner does not support specifying column names when using ·insert into values`."
                            + " If you want to specify column names, please `set enable_nereids_planner=true`.");
        }
        TransactionEntry txnEntry = context.getTxnEntry();
        SelectStmt selectStmt = (SelectStmt) queryStmt;
        int effectRows = 0;
        if (selectStmt.getValueList() != null) {
            Table tbl = txnEntry.getTable();
            int schemaSize = tbl.getBaseSchema(false).size();
            if (parsedStmt instanceof NativeInsertStmt
                    && ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
                NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
                if (nativeInsertStmt.containTargetColumnName(Column.SEQUENCE_COL)) {
                    schemaSize++;
                }
                if (nativeInsertStmt.containTargetColumnName(Column.DELETE_SIGN)) {
                    schemaSize++;
                }
            }
            for (List<Expr> row : selectStmt.getValueList().getRows()) {
                // the value columns are columns which are visible to user, so here we use
                // getBaseSchema(), not getFullSchema()
                if (schemaSize != row.size()) {
                    throw new TException("Column count doesn't match value count");
                }
            }
            FormatOptions options = FormatOptions.getDefault();
            for (List<Expr> row : selectStmt.getValueList().getRows()) {
                ++effectRows;
                InternalService.PDataRow data = StmtExecutor.getRowStringValue(row, options);
                if (data == null) {
                    continue;
                }
                List<InternalService.PDataRow> dataToSend = txnEntry.getDataToSend();
                dataToSend.add(data);
                if (dataToSend.size() >= MAX_DATA_TO_SEND_FOR_TXN) {
                    // send data
                    InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
                    executor.sendData();
                }
            }
        }
        txnEntry.setRowsInTransaction(txnEntry.getRowsInTransaction() + effectRows);
        return effectRows;
    }

    private void beginTxn(String dbName, String tblName) throws UserException, TException,
            InterruptedException, ExecutionException, TimeoutException {
        TransactionEntry txnEntry = context.getTxnEntry();
        TTxnParams txnConf = txnEntry.getTxnConf();
        SessionVariable sessionVariable = context.getSessionVariable();
        long timeoutSecond = context.getExecTimeoutS();

        TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
        Database dbObj = Env.getCurrentInternalCatalog()
                .getDbOrException(dbName, s -> new TException("database is invalid for dbName: " + s));
        Table tblObj = dbObj.getTableOrException(tblName, s -> new TException("table is invalid: " + s));
        txnConf.setDbId(dbObj.getId()).setTbl(tblName).setDb(dbName);
        txnEntry.setTable(tblObj);
        txnEntry.setDb(dbObj);
        String label = txnEntry.getLabel();
        if (Env.getCurrentEnv().isMaster()) {
            long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                    txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
                    new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
                            FrontendOptions.getLocalHostAddress(),
                            ExecuteEnv.getInstance().getStartupTime()),
                    sourceType, timeoutSecond);
            txnConf.setTxnId(txnId);
            String token = Env.getCurrentEnv().getTokenManager().acquireToken();
            txnConf.setToken(token);
        } else {
            String token = Env.getCurrentEnv().getTokenManager().acquireToken();
            MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context);
            TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
            request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token)
                    .setLabel(label).setUser("").setUserIp("").setPasswd("");
            TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request);
            txnConf.setTxnId(result.getTxnId());
            txnConf.setToken(token);
        }

        TStreamLoadPutRequest request = new TStreamLoadPutRequest();

        long maxExecMemByte = sessionVariable.getMaxExecMemByte();
        String timeZone = sessionVariable.getTimeZone();
        int sendBatchParallelism = sessionVariable.getSendBatchParallelism();

        request.setTxnId(txnConf.getTxnId()).setDb(txnConf.getDb())
                .setTbl(txnConf.getTbl())
                .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
                .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId())
                .setExecMemLimit(maxExecMemByte).setTimeout((int) timeoutSecond)
                .setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism).setTrimDoubleQuotes(true);
        if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
            NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
            if (nativeInsertStmt.containTargetColumnName(Column.SEQUENCE_COL)
                    || nativeInsertStmt.containTargetColumnName(Column.DELETE_SIGN)) {
                if (nativeInsertStmt.containTargetColumnName(Column.SEQUENCE_COL)) {
                    request.setSequenceCol(Column.SEQUENCE_COL);
                }
                request.setColumns("`" + String.join("`,`", nativeInsertStmt.getTargetColumnNames()) + "`");
            }
        }

        // execute begin txn
        InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
        executor.beginTransaction(request);
    }

    // Process an insert statement.
    private void handleInsertStmt() throws Exception {
        if (context.getConnectType() == ConnectType.MYSQL) {
            // Every time set no send flag and clean all data in buffer
            context.getMysqlChannel().reset();
        }
        InsertStmt insertStmt = (InsertStmt) parsedStmt;
        // create plan
        if (insertStmt.getQueryStmt().hasOutFileClause()) {
            throw new DdlException("Not support OUTFILE clause in INSERT statement");
        }

        if (insertStmt.getQueryStmt().isExplain()) {
            ExplainOptions explainOptions = insertStmt.getQueryStmt().getExplainOptions();
            insertStmt.setIsExplain(explainOptions);
            String explainString = planner.getExplainString(explainOptions);
            handleExplainStmt(explainString, false);
            return;
        }

        analyzeVariablesInStmt(insertStmt.getQueryStmt());
        long createTime = System.currentTimeMillis();
        Throwable throwable = null;
        long txnId = -1;
        String label = "";
        long loadedRows = 0;
        int filteredRows = 0;
        TransactionStatus txnStatus = TransactionStatus.ABORTED;
        String errMsg = "";
        TableType tblType = insertStmt.getTargetTable().getType();
        boolean isGroupCommit = false;
        boolean reuseGroupCommitPlan = false;
        if (context.isTxnModel()) {
            if (insertStmt.getQueryStmt() instanceof SelectStmt) {
                if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) {
                    throw new TException("Insert into ** select is not supported in a transaction");
                }
            }
            txnStatus = TransactionStatus.PREPARE;
            loadedRows = executeForTxn(insertStmt);
            label = context.getTxnEntry().getLabel();
            txnId = context.getTxnEntry().getTxnConf().getTxnId();
        } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
            isGroupCommit = true;
            NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
            long dbId = nativeInsertStmt.getTargetTable().getDatabase().getId();
            long tableId = nativeInsertStmt.getTargetTable().getId();
            int maxRetry = 3;
            for (int i = 0; i < maxRetry; i++) {
                GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId);
                reuseGroupCommitPlan = nativeInsertStmt.isReuseGroupCommitPlan();
                List<InternalService.PDataRow> rows = groupCommitPlanner.getRows(nativeInsertStmt);
                PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows);
                TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
                ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList();
                if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0)
                        .contains("schema version not match")) {
                    LOG.info("group commit insert failed. stmt: {}, query_id: {}, db_id: {}, table_id: {}"
                                    + ", schema version: {}, backend_id: {}, status: {}, retry: {}",
                            insertStmt.getOrigStmt().originStmt, DebugUtil.printId(context.queryId()), dbId, tableId,
                            nativeInsertStmt.getBaseSchemaVersion(), groupCommitPlanner.getBackendId(),
                            response.getStatus(), i);
                    if (i < maxRetry) {
                        List<TableIf> tables = Lists.newArrayList(insertStmt.getTargetTable());
                        tables.sort((Comparator.comparing(TableIf::getId)));
                        MetaLockUtils.readLockTables(tables);
                        try {
                            insertStmt.reset();
                            analyzer = new Analyzer(context.getEnv(), context);
                            analyzeAndGenerateQueryPlan(context.getSessionVariable().toThrift());
                        } finally {
                            MetaLockUtils.readUnlockTables(tables);
                        }
                        continue;
                    } else {
                        errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId
                                + ", query_id: " + DebugUtil.printId(context.queryId()) + ", backend_id: "
                                + groupCommitPlanner.getBackendId() + ", status: " + response.getStatus();
                        if (response.hasErrorUrl()) {
                            errMsg += ", error url: " + response.getErrorUrl();
                        }
                    }
                } else if (code != TStatusCode.OK) {
                    errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId + ", query_id: "
                            + DebugUtil.printId(context.queryId()) + ", backend_id: "
                            + groupCommitPlanner.getBackendId() + ", status: " + response.getStatus();
                    if (response.hasErrorUrl()) {
                        errMsg += ", error url: " + response.getErrorUrl();
                    }
                    ErrorReport.reportDdlException(errMsg.replaceAll("%", "%%"), ErrorCode.ERR_FAILED_WHEN_INSERT);
                }
                label = response.getLabel();
                txnStatus = TransactionStatus.PREPARE;
                txnId = response.getTxnId();
                loadedRows = response.getLoadedRows();
                filteredRows = (int) response.getFilteredRows();
                break;
            }
        } else {
            label = insertStmt.getLabel();
            LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId()));

            try {
                coord = EnvFactory.getInstance().createCoordinator(context, analyzer,
                        planner, context.getStatsErrorEstimator());
                coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
                coord.setQueryType(TQueryType.LOAD);
                profile.addExecutionProfile(coord.getExecutionProfile());
                QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord);
                QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo);

                Table table = insertStmt.getTargetTable();
                if (table instanceof OlapTable) {
                    boolean isEnableMemtableOnSinkNode =
                            ((OlapTable) table).getTableProperty().getUseSchemaLightChange()
                            ? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
                    coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
                }
                coord.exec();
                int execTimeout = context.getExecTimeoutS();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Insert {} execution timeout:{}ms", DebugUtil.printId(context.queryId()), execTimeout);
                }
                boolean notTimeout = coord.join(execTimeout);
                if (!coord.isDone()) {
                    coord.cancel(new Status(TStatusCode.TIMEOUT, "query execute timeout"));
                    if (notTimeout) {
                        errMsg = coord.getExecStatus().getErrorMsg();
                        ErrorReport.reportDdlException("There exists unhealthy backend. "
                                + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
                    } else {
                        ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
                    }
                }

                if (!coord.getExecStatus().ok()) {
                    errMsg = coord.getExecStatus().getErrorMsg();
                    LOG.warn("insert failed: {}", errMsg);
                    ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("delta files is {}", coord.getDeltaUrls());
                }

                if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
                    loadedRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
                }
                if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
                    filteredRows = Integer.parseInt(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
                }

                // if in strict mode, insert will fail if there are filtered rows
                if (context.getSessionVariable().getEnableInsertStrict()) {
                    if (filteredRows > 0) {
                        context.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
                                "Insert has filtered data in strict mode, tracking_url=" + coord.getTrackingUrl());
                        return;
                    }
                } else {
                    if (filteredRows > context.getSessionVariable().getInsertMaxFilterRatio()
                            * (filteredRows + loadedRows)) {
                        context.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
                                String.format("Insert has too many filtered data %d/%d insert_max_filter_ratio is %f",
                                        filteredRows, filteredRows + loadedRows,
                                        context.getSessionVariable().getInsertMaxFilterRatio()));
                        return;
                    }
                }

                if (tblType != TableType.OLAP && tblType != TableType.MATERIALIZED_VIEW) {
                    // no need to add load job.
                    // MySQL table is already being inserted.
                    context.getState().setOk(loadedRows, filteredRows, null);
                    return;
                }

                if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
                        insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()),
                        insertStmt.getTransactionId(),
                        TabletCommitInfo.fromThrift(coord.getCommitInfos()),
                        context.getSessionVariable().getInsertVisibleTimeoutMs())) {
                    txnStatus = TransactionStatus.VISIBLE;
                } else {
                    txnStatus = TransactionStatus.COMMITTED;
                }
                // TODO(meiyi)
                // insertStmt.afterFinishTxn(true);
                if (Config.isCloudMode()) {
                    String clusterName = context.getCloudCluster();
                    if (context.getSessionVariable().enableMultiClusterSyncLoad()
                            && clusterName != null && !clusterName.isEmpty()) {
                        CloudSystemInfoService infoService = (CloudSystemInfoService) Env.getCurrentSystemInfo();
                        List<List<Backend>> backendsList = infoService
                                                                .getCloudClusterNames()
                                                                .stream()
                                                                .filter(name -> !name.equals(clusterName))
                                                                .map(name -> infoService.getBackendsByClusterName(name))
                                                                .collect(Collectors.toList());
                        List<Long> allTabletIds = ((OlapTable) insertStmt.getTargetTable()).getAllTabletIds();
                        syncLoadForTablets(backendsList, allTabletIds);
                    }
                }
            } catch (Throwable t) {
                // if any throwable being thrown during insert operation, first we should abort this txn
                LOG.warn("handle insert stmt fail: {}", label, t);
                try {
                    Env.getCurrentGlobalTransactionMgr().abortTransaction(
                            insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
                            t.getMessage() == null ? "unknown reason" : t.getMessage());
                } catch (Exception abortTxnException) {
                    // just print a log if abort txn failed. This failure do not need to pass to user.
                    // user only concern abort how txn failed.
                    LOG.warn("errors when abort txn", abortTxnException);
                }

                // cloud mode, insert into select meet -230, retry
                if (Config.isCloudMode() && t.getMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
                    LOG.warn("insert into select meet E-230, retry again");
                    resetAnalyzerAndStmt();
                    if (insertStmt instanceof NativeInsertStmt) {
                        ((NativeInsertStmt) insertStmt).resetPrepare();
                    }
                    throw t;
                }

                StringBuilder sb = new StringBuilder(t.getMessage());
                if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
                    sb.append(". url: " + coord.getTrackingUrl());
                }
                context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
                return;
            } finally {
                if (coord != null) {
                    coord.close();
                }
                finalizeQuery();
            }

            // Go here, which means:
            // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
            // 2. transaction failed but Config.using_old_load_usage_pattern is true.
            // we will record the load job info for these 2 cases
            txnId = insertStmt.getTransactionId();
            try {
                context.getEnv().getLoadManager()
                        .recordFinishedLoadJob(label, txnId, insertStmt.getDbName(),
                                insertStmt.getTargetTable().getId(),
                                EtlJobType.INSERT, createTime, throwable == null ? "" : throwable.getMessage(),
                                coord.getTrackingUrl(), insertStmt.getUserInfo(), 0L);
            } catch (MetaNotFoundException e) {
                LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
                errMsg = "Record info of insert load with error " + e.getMessage();
            }
        }

        // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
        // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
        StringBuilder sb = new StringBuilder();
        sb.append("{'label':'").append(label).append("', 'status':'").append(txnStatus.name());
        sb.append("', 'txnId':'").append(txnId).append("'");
        if (tblType == TableType.MATERIALIZED_VIEW) {
            sb.append("', 'rows':'").append(loadedRows).append("'");
        }
        if (!Strings.isNullOrEmpty(errMsg)) {
            sb.append(", 'err':'").append(errMsg).append("'");
        }
        if (isGroupCommit) {
            sb.append(", 'query_id':'").append(DebugUtil.printId(context.queryId)).append("'");
            if (reuseGroupCommitPlan) {
                sb.append(", 'reuse_group_commit_plan':'").append(true).append("'");
            }
        }
        sb.append("}");

        context.getState().setOk(loadedRows, filteredRows, sb.toString());

        // set insert result in connection context,
        // so that user can use `show insert result` to get info of the last insert operation.
        context.setOrUpdateInsertResult(txnId, label, insertStmt.getDbName(), insertStmt.getTbl(),
                txnStatus, loadedRows, filteredRows);
        // update it, so that user can get loaded rows in fe.audit.log
        context.updateReturnRows((int) loadedRows);
    }

    public static void syncLoadForTablets(List<List<Backend>> backendsList, List<Long> allTabletIds) {
        backendsList.forEach(backends -> backends.forEach(backend -> {
            if (backend.isAlive()) {
                List<Long> tabletIdList = new ArrayList<Long>();
                Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
                                           .getCloudTabletRebalancer()
                                           .getSnapshotTabletsInPrimaryByBeId(backend.getId());
                allTabletIds.forEach(tabletId -> {
                    if (beTabletIds.contains(tabletId)) {
                        tabletIdList.add(tabletId);
                    }
                });
                boolean ok = false;
                TNetworkAddress address = null;
                Client client = null;
                try {
                    address = new TNetworkAddress(backend.getHost(), backend.getBePort());
                    client = ClientPool.backendPool.borrowObject(address);
                    client.syncLoadForTablets(new TSyncLoadForTabletsRequest(allTabletIds));
                    ok = true;
                } catch (Exception e) {
                    LOG.warn(e.getMessage());
                } finally {
                    if (!ok) {
                        ClientPool.backendPool.invalidateObject(address, client);
                    } else {
                        ClientPool.backendPool.returnObject(address, client);
                    }
                }
            }
        }));
    }

    private void handleExternalInsertStmt() {
        // TODO(tsy): load refactor, handle external load here
        try {
            InsertStmt insertStmt = (InsertStmt) parsedStmt;
            LoadType loadType = insertStmt.getLoadType();
            if (loadType == LoadType.UNKNOWN) {
                throw new DdlException("Unknown load job type");
            }
            LoadManagerAdapter loadManagerAdapter = context.getEnv().getLoadManagerAdapter();
            loadManagerAdapter.submitLoadFromInsertStmt(context, insertStmt);
            // when complete
            if (loadManagerAdapter.getMysqlLoadId() != null) {
                this.mysqlLoadId = loadManagerAdapter.getMysqlLoadId();
            }
        } catch (UserException e) {
            // Return message to info client what happened.
            if (LOG.isDebugEnabled()) {
                LOG.debug("DDL statement({}) process failed.", originStmt.originStmt, e);
            }
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
        }
    }

    private void handleUnsupportedStmt() {
        if (context.getConnectType() == ConnectType.MYSQL) {
            context.getMysqlChannel().reset();
        }
        // do nothing
        context.getState().setOk();
    }

    private void handleAnalyzeStmt() throws DdlException, AnalysisException {
        context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) parsedStmt, isProxy);
    }

    // Process switch catalog
    private void handleSwitchStmt() throws AnalysisException {
        SwitchStmt switchStmt = (SwitchStmt) parsedStmt;
        try {
            context.getEnv().changeCatalog(context, switchStmt.getCatalogName());
        } catch (DdlException e) {
            LOG.warn("", e);
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            return;
        }
        context.getState().setOk();
    }

    // Process use statement.
    private void handleUseStmt() throws AnalysisException {
        UseStmt useStmt = (UseStmt) parsedStmt;
        try {
            if (useStmt.getCatalogName() != null) {
                context.getEnv().changeCatalog(context, useStmt.getCatalogName());
            }
            context.getEnv().changeDb(context, useStmt.getDatabase());
        } catch (DdlException e) {
            LOG.warn("", e);
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            return;
        }
        context.getState().setOk();
    }

    private void handleUseCloudClusterStmt() throws AnalysisException {
        if (!Config.isCloudMode()) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_CLOUD_MODE);
            return;
        }

        UseCloudClusterStmt useCloudClusterStmt = (UseCloudClusterStmt) parsedStmt;
        try {
            ((CloudEnv) context.getEnv()).changeCloudCluster(useCloudClusterStmt.getCluster(), context);
        } catch (DdlException e) {
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            return;
        }

        if (Strings.isNullOrEmpty(useCloudClusterStmt.getDatabase())) {
            return;
        }

        try {
            if (useCloudClusterStmt.getCatalogName() != null) {
                context.getEnv().changeCatalog(context, useCloudClusterStmt.getCatalogName());
            }
            context.getEnv().changeDb(context, useCloudClusterStmt.getDatabase());
        } catch (DdlException e) {
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            return;
        }

        context.getState().setOk();
    }

    private void handleWarmUpStmt() throws IOException, AnalysisException {
        WarmUpClusterStmt stmt = (WarmUpClusterStmt) parsedStmt;
        long jobId = -1;
        try {
            jobId = ((CloudEnv) context.getEnv()).getCacheHotspotMgr().createJob(stmt);
            ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
            builder.addColumn(new Column("JobId", ScalarType.createVarchar(30)));
            List<List<String>> infos = Lists.newArrayList();
            List<String> info = Lists.newArrayList();
            info.add(String.valueOf(jobId));
            infos.add(info);
            ShowResultSet resultSet = new ShowResultSet(builder.build(), infos);
            if (resultSet == null) {
                // state changed in execute
                return;
            }
            if (isProxy) {
                proxyShowResultSet = resultSet;
                return;
            }

            sendResultSet(resultSet);
        } catch (AnalysisException e) {
            LOG.info("failed to create a warm up job, error: {}", e.getMessage());
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
        }
    }

    private void sendMetaData(ResultSetMetaData metaData) throws IOException {
        sendMetaData(metaData, null);
    }

    private void sendMetaData(ResultSetMetaData metaData, List<FieldInfo> fieldInfos) throws IOException {
        Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
        // sends how many columns
        serializer.reset();
        serializer.writeVInt(metaData.getColumnCount());
        context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
        // send field one by one
        for (int i = 0; i < metaData.getColumns().size(); i++) {
            Column col = metaData.getColumn(i);
            serializer.reset();
            if (fieldInfos == null) {
                // TODO(zhaochun): only support varchar type
                serializer.writeField(col.getName(), col.getType());
            } else {
                serializer.writeField(fieldInfos.get(i), col.getType());
            }
            context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
        }
        // send EOF
        serializer.reset();
        MysqlEofPacket eofPacket = new MysqlEofPacket(context.getState());
        eofPacket.writeTo(serializer);
        context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
    }

    private List<PrimitiveType> exprToStringType(List<Expr> exprs) {
        return exprs.stream().map(e -> PrimitiveType.STRING).collect(Collectors.toList());
    }

    public void sendStmtPrepareOK(int stmtId, List<String> labels, List<Slot> output) throws IOException {
        Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
        // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_prepare.html#sect_protocol_com_stmt_prepare_response
        serializer.reset();
        // 0x00 OK
        serializer.writeInt1(0);
        // statement_id
        serializer.writeInt4(stmtId);
        // num_columns
        int numColumns = output == null ? 0 : output.size();
        serializer.writeInt2(numColumns);
        // num_params
        int numParams = labels.size();
        serializer.writeInt2(numParams);
        // reserved_1
        serializer.writeInt1(0);
        if (numParams > 0 || numColumns > 0) {
            // warning_count
            serializer.writeInt2(0);
            // metadata_follows
            serializer.writeInt1(1);
        }
        context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
        if (numParams > 0) {
            // send field one by one
            // TODO use real type instead of string, for JDBC client it's ok
            // but for other client, type should be correct
            // List<PrimitiveType> types = exprToStringType(labels);
            List<String> colNames = labels;
            for (int i = 0; i < colNames.size(); ++i) {
                serializer.reset();
                // serializer.writeField(colNames.get(i), Type.fromPrimitiveType(types.get(i)));
                serializer.writeField(colNames.get(i), Type.STRING);
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }
            serializer.reset();
            if (!context.getMysqlChannel().clientDeprecatedEOF()) {
                MysqlEofPacket eofPacket = new MysqlEofPacket(context.getState());
                eofPacket.writeTo(serializer);
            } else {
                MysqlOkPacket okPacket = new MysqlOkPacket(context.getState());
                okPacket.writeTo(serializer);
            }
            context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
        }
        if (numColumns > 0) {
            for (Slot slot : output) {
                serializer.reset();
                if (slot instanceof SlotReference
                        && ((SlotReference) slot).getOriginalColumn().isPresent()
                        && ((SlotReference) slot).getOriginalTable().isPresent()) {
                    SlotReference slotReference = (SlotReference) slot;
                    TableIf table = slotReference.getOriginalTable().get();
                    Column column = slotReference.getOriginalColumn().get();
                    DatabaseIf database = table.getDatabase();
                    String dbName = database == null ? "" : database.getFullName();
                    serializer.writeField(dbName, table.getName(), column, false);
                } else {
                    serializer.writeField(slot.getName(), slot.getDataType().toCatalogDataType());
                }
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }
            serializer.reset();
            if (!context.getMysqlChannel().clientDeprecatedEOF()) {
                MysqlEofPacket eofPacket = new MysqlEofPacket(context.getState());
                eofPacket.writeTo(serializer);
            } else {
                MysqlOkPacket okPacket = new MysqlOkPacket(context.getState());
                okPacket.writeTo(serializer);
            }
            context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
        }
        context.getMysqlChannel().flush();
        context.getState().setNoop();
    }

    private void sendFields(List<String> colNames, List<Type> types) throws IOException {
        sendFields(colNames, null, types);
    }

    private void sendFields(List<String> colNames, List<FieldInfo> fieldInfos, List<Type> types) throws IOException {
        Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
        // sends how many columns
        serializer.reset();
        serializer.writeVInt(colNames.size());
        if (LOG.isDebugEnabled()) {
            LOG.debug("sendFields {}", colNames);
        }
        context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
        StatementContext statementContext = context.getStatementContext();
        boolean isShortCircuited = statementContext.isShortCircuitQuery()
                        && statementContext.getShortCircuitQueryContext() != null;
        ShortCircuitQueryContext ctx = statementContext.getShortCircuitQueryContext();
        // send field one by one
        for (int i = 0; i < colNames.size(); ++i) {
            serializer.reset();
            if (context.getCommand() == MysqlCommand.COM_STMT_EXECUTE && isShortCircuited) {
                // Using PreparedStatment pre serializedField to avoid serialize each time
                // we send a field
                byte[] serializedField = ctx.getSerializedField(i);
                if (serializedField == null) {
                    if (fieldInfos != null) {
                        serializer.writeField(fieldInfos.get(i), types.get(i));
                    } else {
                        serializer.writeField(colNames.get(i), types.get(i));
                    }
                    serializedField = serializer.toArray();
                    ctx.addSerializedField(i, serializedField);
                }
                context.getMysqlChannel().sendOnePacket(ByteBuffer.wrap(serializedField));
            } else {
                if (fieldInfos != null) {
                    serializer.writeField(fieldInfos.get(i), types.get(i));
                } else {
                    serializer.writeField(colNames.get(i), types.get(i));
                }
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }
        }
        // send EOF
        serializer.reset();
        MysqlEofPacket eofPacket = new MysqlEofPacket(context.getState());
        eofPacket.writeTo(serializer);
        context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
    }

    public void sendResultSet(ResultSet resultSet) throws IOException {
        sendResultSet(resultSet, null);
    }

    public void sendResultSet(ResultSet resultSet, List<FieldInfo> fieldInfos) throws IOException {
        if (context.getConnectType().equals(ConnectType.MYSQL)) {
            context.updateReturnRows(resultSet.getResultRows().size());
            // Send meta data.
            sendMetaData(resultSet.getMetaData(), fieldInfos);

            // Send result set.
            for (List<String> row : resultSet.getResultRows()) {
                serializer.reset();
                for (String item : row) {
                    if (item == null || item.equals(FeConstants.null_string)) {
                        serializer.writeNull();
                    } else {
                        serializer.writeLenEncodedString(item);
                    }
                }
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }

            context.getState().setEof();
        } else if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
            context.updateReturnRows(resultSet.getResultRows().size());
            context.getFlightSqlChannel()
                    .addResult(DebugUtil.printId(context.queryId()), context.getRunningQuery(), resultSet);
            context.getState().setEof();
        } else {
            LOG.error("sendResultSet error connect type");
        }
    }

    // Process show statement
    private void handleShow() throws IOException, AnalysisException, DdlException {
        ShowExecutor executor = new ShowExecutor(context, (ShowStmt) parsedStmt);
        ShowResultSet resultSet = executor.execute();
        if (resultSet == null) {
            // state changed in execute
            return;
        }
        if (isProxy) {
            proxyShowResultSet = resultSet;
            return;
        }

        sendResultSet(resultSet);
    }

    private void handleUnlockTablesStmt() {
    }

    private void handleLockTablesStmt() {
    }

    public void handleShowConstraintStmt(List<List<String>> result) throws IOException {
        ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
                .addColumn(new Column("Name", ScalarType.createVarchar(20)))
                .addColumn(new Column("Type", ScalarType.createVarchar(20)))
                .addColumn(new Column("Definition", ScalarType.createVarchar(20)))
                .build();
        ResultSet resultSet = new ShowResultSet(metaData, result);
        sendResultSet(resultSet);
    }

    public void handleShowCreateMTMVStmt(List<List<String>> result) throws IOException {
        ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
                .addColumn(new Column("Materialized View", ScalarType.createVarchar(20)))
                .addColumn(new Column("Create Materialized View", ScalarType.createVarchar(30)))
                .build();
        ResultSet resultSet = new ShowResultSet(metaData, result);
        sendResultSet(resultSet);
    }

    public void handleExplainPlanProcessStmt(List<PlanProcess> result) throws IOException {
        ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
                .addColumn(new Column("Rule", ScalarType.createVarchar(-1)))
                .addColumn(new Column("Before", ScalarType.createVarchar(-1)))
                .addColumn(new Column("After", ScalarType.createVarchar(-1)))
                .build();
        if (context.getConnectType() == ConnectType.MYSQL) {
            sendMetaData(metaData);

            for (PlanProcess row : result) {
                serializer.reset();
                serializer.writeLenEncodedString(row.ruleName);
                serializer.writeLenEncodedString(row.beforeShape);
                serializer.writeLenEncodedString(row.afterShape);
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }
        }
        context.getState().setEof();
    }

    public void handleExplainStmt(String result, boolean isNereids) throws IOException {
        ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
                .addColumn(new Column("Explain String" + (isNereids ? "(Nereids Planner)" : "(Old Planner)"),
                        ScalarType.createVarchar(20)))
                .build();
        if (context.getConnectType() == ConnectType.MYSQL) {
            sendMetaData(metaData);

            // Send result set.
            for (String item : result.split("\n")) {
                serializer.reset();
                serializer.writeLenEncodedString(item);
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }
        } else if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
            context.getFlightSqlChannel()
                    .addResult(DebugUtil.printId(context.queryId()), context.getRunningQuery(), metaData, result);
            context.setReturnResultFromLocal(true);
        }
        context.getState().setEof();
    }

    public void handleReplayStmt(String result) throws IOException {
        ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
                .addColumn(new Column("Plan Replayer dump url",
                ScalarType.createVarchar(20)))
                .build();
        if (context.getConnectType() == ConnectType.MYSQL) {
            sendMetaData(metaData);

            // Send result set.
            for (String item : result.split("\n")) {
                serializer.reset();
                serializer.writeLenEncodedString(item);
                context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
            }
        } else if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
            context.getFlightSqlChannel()
                    .addResult(DebugUtil.printId(context.queryId()), context.getRunningQuery(), metaData, result);
            context.setReturnResultFromLocal(true);
        }
        context.getState().setEof();
    }

    private void handleLoadStmt() {
        try {
            LoadStmt loadStmt = (LoadStmt) parsedStmt;
            EtlJobType jobType = loadStmt.getEtlJobType();
            if (jobType == EtlJobType.UNKNOWN) {
                throw new DdlException("Unknown load job type");
            }
            if (jobType == EtlJobType.HADOOP) {
                throw new DdlException("Load job by hadoop cluster is disabled."
                        + " Try using broker load. See 'help broker load;'");
            }
            LoadManager loadManager = context.getEnv().getLoadManager();
            if (jobType == EtlJobType.LOCAL_FILE) {
                if (!context.getCapability().supportClientLocalFile()) {
                    context.getState().setError(ErrorCode.ERR_NOT_ALLOWED_COMMAND, "This client is not support"
                            + " to load client local file.");
                    return;
                }
                String loadId = UUID.randomUUID().toString();
                mysqlLoadId = loadId;
                LoadJobRowResult submitResult = loadManager.getMysqlLoadManager()
                        .executeMySqlLoadJobFromStmt(context, loadStmt, loadId);
                context.getState().setOk(submitResult.getRecords(), submitResult.getWarnings(),
                        submitResult.toString());
            } else {
                loadManager.createLoadJobFromStmt(loadStmt);
                context.getState().setOk();
            }
        } catch (UserException e) {
            // Return message to info client what happened.
            if (LOG.isDebugEnabled()) {
                LOG.debug("DDL statement({}) process failed.", originStmt.originStmt, e);
            }
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
        }
    }

    private void handleUpdateStmt() {
        try {
            UpdateStmt updateStmt = (UpdateStmt) parsedStmt;
            parsedStmt = updateStmt.getInsertStmt();
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("update data error, stmt={}", updateStmt.toSql());
            }
        } catch (Exception e) {
            LOG.warn("update data error, stmt={}", parsedStmt.toSql(), e);
        }
    }

    private void handleDeleteStmt() {
        try {
            DeleteStmt deleteStmt = (DeleteStmt) parsedStmt;
            parsedStmt = deleteStmt.getInsertStmt();
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("delete data error, stmt={}", deleteStmt.toSql());
            }
        } catch (Exception e) {
            LOG.warn("delete data error, stmt={}", parsedStmt.toSql(), e);
        }
    }

    private void handleDdlStmt() {
        try {
            DdlExecutor.execute(context.getEnv(), (DdlStmt) parsedStmt);
            if (!(parsedStmt instanceof AnalyzeStmt)) {
                context.getState().setOk();
            }
            // copy into used
            if (context.getState().getResultSet() != null) {
                if (isProxy) {
                    proxyShowResultSet = context.getState().getResultSet();
                    return;
                }
                sendResultSet(context.getState().getResultSet());
            }
        } catch (QueryStateException e) {
            LOG.warn("", e);
            context.setState(e.getQueryState());
        } catch (UserException e) {
            // Return message to info client what happened.
            LOG.warn("DDL statement({}) process failed.", originStmt.originStmt, e);
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
        }
    }

    private void handleExportStmt() throws Exception {
        ExportStmt exportStmt = (ExportStmt) parsedStmt;
        context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt.getExportJob());
    }

    private void handleCtasStmt() {
        CreateTableAsSelectStmt ctasStmt = (CreateTableAsSelectStmt) this.parsedStmt;
        try {
            // create table
            DdlExecutor.execute(context.getEnv(), ctasStmt);
            context.getState().setOk();
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            return;
        }
        if (ctasStmt.isTableHasExists()) {
            return;
        }
        // after success create table insert data
        try {
            parsedStmt = ctasStmt.getInsertStmt();
            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("CTAS insert data error, stmt={}", ctasStmt.toSql());
                handleCtasRollback(ctasStmt.getCreateTableStmt().getDbTbl());
            }
        } catch (Exception e) {
            LOG.warn("CTAS insert data error, stmt={}", ctasStmt.toSql(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            handleCtasRollback(ctasStmt.getCreateTableStmt().getDbTbl());
        }
    }

    private void handleCtasRollback(TableName table) {
        if (context.getSessionVariable().isDropTableIfCtasFailed()) {
            // insert error drop table
            DropTableStmt dropTableStmt = new DropTableStmt(true, table, true);
            try {
                DdlExecutor.execute(context.getEnv(), dropTableStmt);
            } catch (Exception ex) {
                LOG.warn("CTAS drop table error, stmt={}", parsedStmt.toSql(), ex);
                context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
            }
        }
    }

    private void handleIotStmt() throws AnalysisException {
        ConnectContext.get().setSkipAuth(true);
        try {
            InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) this.parsedStmt;
            if (iotStmt.getPartitionNames().size() == 0) {
                // insert overwrite table
                handleOverwriteTable(iotStmt);
            } else if (iotStmt.isAutoDetectPartition()) {
                // insert overwrite table auto detect which partitions need to replace
                handleAutoOverwritePartition(iotStmt);
            } else {
                // insert overwrite table with partition
                handleOverwritePartition(iotStmt);
            }
        } finally {
            ConnectContext.get().setSkipAuth(false);
        }
    }

    private void handleOverwriteTable(InsertOverwriteTableStmt iotStmt) {
        UUID uuid = UUID.randomUUID();
        // to comply with naming rules
        TableName tmpTableName = new TableName(null, iotStmt.getDb(), "tmp_table_" + uuid.toString().replace('-', '_'));
        TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl());
        try {
            // create a tmp table with uuid
            parsedStmt = new CreateTableLikeStmt(false, false, tmpTableName, targetTableName, null, false);
            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            execute();
            // if create tmp table err, return
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                // There is already an error message in the execute() function, so there is no need to set it here
                LOG.warn("IOT create table error, stmt={}", originStmt.originStmt);
                return;
            }
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("IOT create a tmp table error, stmt={}", originStmt.originStmt, e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            return;
        }
        // after success create table insert data
        // when overwrite table, allow auto partition or not is controlled by session variable.
        boolean allowAutoPartition = context.getSessionVariable().isEnableAutoCreateWhenOverwrite();
        try {
            parsedStmt = new NativeInsertStmt(tmpTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()),
                    iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), allowAutoPartition);
            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql());
                handleIotRollback(tmpTableName);
                return;
            }
        } catch (Exception e) {
            LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            handleIotRollback(tmpTableName);
            return;
        }

        // overwrite old table with tmp table
        try {
            List<AlterClause> ops = new ArrayList<>();
            Map<String, String> properties = new HashMap<>();
            properties.put("swap", "false");
            // swap false. but this operation is internal. so we will consider it as force drop for original table.
            ops.add(new ReplaceTableClause(tmpTableName.getTbl(), properties, true));
            parsedStmt = new AlterTableStmt(targetTableName, ops);
            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("IOT overwrite table error, stmt={}", parsedStmt.toSql());
                handleIotRollback(tmpTableName);
                return;
            }
            context.getState().setOk();
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("IOT overwrite table error, stmt={}", parsedStmt.toSql(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            handleIotRollback(tmpTableName);
        }

    }

    private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) {
        TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl());
        List<String> partitionNames = iotStmt.getPartitionNames();
        List<String> tempPartitionName = new ArrayList<>();
        try {
            // create tmp partitions with uuid
            for (String partitionName : partitionNames) {
                UUID uuid = UUID.randomUUID();
                // to comply with naming rules
                String tempPartName = "tmp_partition_" + uuid.toString().replace('-', '_');
                List<AlterClause> ops = new ArrayList<>();
                ops.add(new AddPartitionLikeClause(tempPartName, partitionName, true));
                parsedStmt = new AlterTableStmt(targetTableName, ops);
                parsedStmt.setUserInfo(context.getCurrentUserIdentity());
                execute();
                if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                    LOG.warn("IOT create tmp partitions error, stmt={}", originStmt.originStmt);
                    handleIotPartitionRollback(targetTableName, tempPartitionName);
                    return;
                }
                // only when execution succeeded, put the temp partition name into list
                tempPartitionName.add(tempPartName);
            }
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("IOT create tmp table partitions error, stmt={}", originStmt.originStmt, e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            handleIotPartitionRollback(targetTableName, tempPartitionName);
            return;
        }
        // after success add tmp partitions
        // when overwrite partition, auto creating is always disallowed.
        try {
            parsedStmt = new NativeInsertStmt(targetTableName, new PartitionNames(true, tempPartitionName),
                    new LabelName(iotStmt.getDb(), iotStmt.getLabel()), iotStmt.getQueryStmt(),
                    iotStmt.getHints(), iotStmt.getCols(), false);
            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql());
                handleIotPartitionRollback(targetTableName, tempPartitionName);
                return;
            }
        } catch (Exception e) {
            LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            handleIotPartitionRollback(targetTableName, tempPartitionName);
            return;
        }

        // overwrite old partition with tmp partition
        try {
            List<AlterClause> ops = new ArrayList<>();
            Map<String, String> properties = new HashMap<>();
            properties.put("use_temp_partition_name", "false");
            ops.add(new ReplacePartitionClause(new PartitionNames(false, partitionNames),
                    new PartitionNames(true, tempPartitionName), true, properties));
            parsedStmt = new AlterTableStmt(targetTableName, ops);
            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
            execute();
            if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
                LOG.warn("IOT overwrite table partitions error, stmt={}", parsedStmt.toSql());
                handleIotPartitionRollback(targetTableName, tempPartitionName);
                return;
            }
            context.getState().setOk();
        } catch (Exception e) {
            // Maybe our bug
            LOG.warn("IOT overwrite table partitions error, stmt={}", parsedStmt.toSql(), e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
            handleIotPartitionRollback(targetTableName, tempPartitionName);
        }
    }

    private void handleAutoOverwritePartition(InsertOverwriteTableStmt iotStmt) throws AnalysisException {
        throw new AnalysisException(
                "insert overwrite auto detect is not support in legacy planner. use nereids instead");
    }

    private void handleIotRollback(TableName table) {
        // insert error drop the tmp table
        DropTableStmt dropTableStmt = new DropTableStmt(true, table, true);
        try {
            Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), context);
            dropTableStmt.analyze(tempAnalyzer);
            DdlExecutor.execute(context.getEnv(), dropTableStmt);
        } catch (Exception ex) {
            LOG.warn("IOT drop table error, stmt={}", parsedStmt.toSql(), ex);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
        }
    }

    private void handleIotPartitionRollback(TableName targetTableName, List<String> tempPartitionNames) {
        // insert error drop the tmp partitions
        try {
            for (String partitionName : tempPartitionNames) {
                List<AlterClause> ops = new ArrayList<>();
                ops.add(new DropPartitionClause(true, partitionName, true, true));
                AlterTableStmt dropTablePartitionStmt = new AlterTableStmt(targetTableName, ops);
                Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), context);
                dropTablePartitionStmt.analyze(tempAnalyzer);
                DdlExecutor.execute(context.getEnv(), dropTablePartitionStmt);
            }
        } catch (Exception ex) {
            LOG.warn("IOT drop partitions error, stmt={}", parsedStmt.toSql(), ex);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
        }
    }

    public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
        if (statisticsForAuditLog == null) {
            statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
        }
        if (!statisticsForAuditLog.hasScanBytes()) {
            statisticsForAuditLog.setScanBytes(0L);
        }
        if (!statisticsForAuditLog.hasScanRows()) {
            statisticsForAuditLog.setScanRows(0L);
        }
        if (!statisticsForAuditLog.hasReturnedRows()) {
            statisticsForAuditLog.setReturnedRows(0L);
        }
        if (!statisticsForAuditLog.hasCpuMs()) {
            statisticsForAuditLog.setCpuMs(0L);
        }
        return statisticsForAuditLog.build();
    }

    private boolean isShortCircuitedWithCtx() {
        return statementContext.isShortCircuitQuery()
                        && statementContext.getShortCircuitQueryContext() != null;
    }

    private List<Type> exprToType(List<Expr> exprs) {
        return exprs.stream().map(e -> e.getType()).collect(Collectors.toList());
    }

    public StatementBase setParsedStmt(StatementBase parsedStmt) {
        this.parsedStmt = parsedStmt;
        this.statementContext.setParsedStatement(parsedStmt);
        return parsedStmt;
    }

    public List<Slot> planPrepareStatementSlots() throws Exception {
        parseByNereids();
        Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
                "Nereids only process LogicalPlanAdapter,"
                        + " but parsedStmt is " + parsedStmt.getClass().getName());
        NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
        nereidsPlanner.plan(parsedStmt, context.getSessionVariable().toThrift());
        return nereidsPlanner.getPhysicalPlan().getOutput();
    }

    public List<ResultRow> executeInternalQuery() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("INTERNAL QUERY: {}", originStmt.toString());
        }
        UUID uuid = UUID.randomUUID();
        TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
        context.setQueryId(queryId);
        if (originStmt.originStmt != null) {
            context.setSqlHash(DigestUtils.md5Hex(originStmt.originStmt));
        }
        try {
            List<ResultRow> resultRows = new ArrayList<>();
            try {
                parseByNereids();
                Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
                        "Nereids only process LogicalPlanAdapter,"
                                + " but parsedStmt is " + parsedStmt.getClass().getName());
                context.getState().setNereids(true);
                context.getState().setIsQuery(true);
                planner = new NereidsPlanner(statementContext);
                planner.plan(parsedStmt, context.getSessionVariable().toThrift());
            } catch (Exception e) {
                LOG.warn("Failed to run internal SQL: {}", originStmt, e);
                throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
            }
            RowBatch batch;
            if (Config.enable_collect_internal_query_profile) {
                context.getSessionVariable().enableProfile = true;
            }
            coord = EnvFactory.getInstance().createCoordinator(context, analyzer,
                    planner, context.getStatsErrorEstimator());
            profile.addExecutionProfile(coord.getExecutionProfile());
            try {
                QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                        new QueryInfo(context, originStmt.originStmt, coord));
            } catch (UserException e) {
                throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
            }
            updateProfile(false);
            try {
                coord.exec();
            } catch (Exception e) {
                throw new InternalQueryExecutionException(e.getMessage() + Util.getRootCauseMessage(e), e);
            }

            try {
                while (true) {
                    batch = coord.getNext();
                    Preconditions.checkNotNull(batch, "Batch is Null.");
                    if (batch.isEos()) {
                        LOG.info("Result rows for query {} is {}", DebugUtil.printId(queryId), resultRows.size());
                        return resultRows;
                    } else {
                        // For null and not EOS batch, continue to get the next batch.
                        if (batch.getBatch() == null) {
                            continue;
                        }
                        if (batch.getBatch().getRows() != null) {
                            context.updateReturnRows(batch.getBatch().getRows().size());
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Batch size for query {} is {}",
                                        DebugUtil.printId(queryId), batch.getBatch().rows.size());
                            }
                        }
                        resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Result size for query {} is currently {}",
                                    DebugUtil.printId(queryId), resultRows.size());
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Failed to fetch internal SQL result. " + Util.getRootCauseMessage(e), e);
            }
        } finally {
            if (coord != null) {
                coord.close();
            }
            AuditLogHelper.logAuditLog(context, originStmt.originStmt, parsedStmt, getQueryStatisticsForAuditLog(),
                    true);
            QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
            updateProfile(true);
        }
    }

    private List<ResultRow> convertResultBatchToResultRows(TResultBatch batch) {
        List<String> columns = parsedStmt.getColLabels();
        List<ResultRow> resultRows = new ArrayList<>();
        List<ByteBuffer> rows = batch.getRows();
        for (ByteBuffer buffer : rows) {
            List<String> values = Lists.newArrayList();
            InternalQueryBuffer queryBuffer = new InternalQueryBuffer(buffer.slice());

            for (int i = 0; i < columns.size(); i++) {
                String value = queryBuffer.readStringWithLength();
                values.add(value);
            }
            ResultRow resultRow = new ResultRow(values);
            resultRows.add(resultRow);
        }
        return resultRows;
    }

    public Coordinator getCoord() {
        return coord;
    }

    public List<String> getColumns() {
        return parsedStmt.getColLabels();
    }

    public List<Type> getReturnTypes() {
        return exprToType(parsedStmt.getResultExprs());
    }

    public List<Type> getReturnTypes(Queriable stmt) {
        if (isShortCircuitedWithCtx()) {
            return statementContext.getShortCircuitQueryContext().getReturnTypes();
        }
        return exprToType(stmt.getResultExprs());
    }

    private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
        LOG.info("TUniqueId: {} generate stream load plan", queryId);
        context.setQueryId(queryId);
        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());

        parseByNereids();
        Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
                "Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName());
        context.getState().setNereids(true);
        InsertIntoTableCommand insert = (InsertIntoTableCommand) ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
        HttpStreamParams httpStreamParams = new HttpStreamParams();

        try {
            if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
                if (!Config.wait_internal_group_commit_finish && insert.getLabelName().isPresent()) {
                    throw new AnalysisException("label and group_commit can't be set at the same time");
                }
                context.setGroupCommit(true);
            }
            OlapInsertExecutor insertExecutor = (OlapInsertExecutor) insert.initPlan(context, this);
            httpStreamParams.setTxnId(insertExecutor.getTxnId());
            httpStreamParams.setDb(insertExecutor.getDatabase());
            httpStreamParams.setTable(insertExecutor.getTable());
            httpStreamParams.setLabel(insertExecutor.getLabelName());

            PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
            boolean isValidPlan = !planner.getScanNodes().isEmpty();
            for (ScanNode scanNode : planner.getScanNodes()) {
                if (!(scanNode instanceof TVFScanNode || planRoot instanceof GroupCommitScanNode)) {
                    isValidPlan = false;
                    break;
                }
            }
            if (!isValidPlan) {
                throw new AnalysisException("plan is invalid: " + planRoot.getExplainString());
            }
        } catch (QueryStateException e) {
            LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e);
            context.setState(e.getQueryState());
            throw new NereidsException("Command(" + originStmt.originStmt + ") process failed",
                    new AnalysisException(e.getMessage(), e));
        } catch (UserException e) {
            // Return message to info client what happened.
            LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e);
            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            throw new NereidsException("Command (" + originStmt.originStmt + ") process failed",
                    new AnalysisException(e.getMessage(), e));
        } catch (Exception e) {
            // Maybe our bug
            LOG.debug("Command (" + originStmt.originStmt + ") process failed.", e);
            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage());
            throw new NereidsException("Command (" + originStmt.originStmt + ") process failed.",
                    new AnalysisException(e.getMessage(), e));
        }
        return httpStreamParams;
    }

    private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) throws Exception {
        // Due to executing Nereids, it needs to be reset
        planner = null;
        context.getState().setNereids(false);
        context.setTxnEntry(null);
        context.setQueryId(queryId);
        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
        SqlScanner input = new SqlScanner(new StringReader(originStmt.originStmt),
                context.getSessionVariable().getSqlMode());
        SqlParser parser = new SqlParser(input);
        parsedStmt = SqlParserUtils.getFirstStmt(parser);
        if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
            if (!Config.wait_internal_group_commit_finish && ((NativeInsertStmt) parsedStmt).getLabel() != null) {
                throw new AnalysisException("label and group_commit can't be set at the same time");
            }
            ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true;
        }
        NativeInsertStmt insertStmt = (NativeInsertStmt) parsedStmt;
        analyze(context.getSessionVariable().toThrift());
        HttpStreamParams httpStreamParams = new HttpStreamParams();
        httpStreamParams.setTxnId(insertStmt.getTransactionId());
        httpStreamParams.setDb(insertStmt.getDbObj());
        httpStreamParams.setTable(insertStmt.getTargetTable());
        httpStreamParams.setLabel(insertStmt.getLabel());
        return httpStreamParams;
    }

    public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Exception {
        SessionVariable sessionVariable = context.getSessionVariable();
        HttpStreamParams httpStreamParams = null;
        try {
            try {
                // disable shuffle for http stream (only 1 sink)
                sessionVariable.setVarOnce(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, "false");
                httpStreamParams = generateHttpStreamNereidsPlan(queryId);
            } catch (NereidsException | ParseException e) {
                if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
                    MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId()));
                }
                // try to fall back to legacy planner
                if (LOG.isDebugEnabled()) {
                    LOG.debug("nereids cannot process statement\n{}\n because of {}",
                            originStmt.originStmt, e.getMessage(), e);
                }
                if (e instanceof NereidsException) {
                    LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
                    throw ((NereidsException) e).getException();
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            // revert Session Value
            try {
                VariableMgr.revertSessionValue(sessionVariable);
                // origin value init
                sessionVariable.setIsSingleSetVar(false);
                sessionVariable.clearSessionOriginValue();
            } catch (DdlException e) {
                LOG.warn("failed to revert Session value. {}", context.getQueryIdentifier(), e);
                context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
            }
        }
        return httpStreamParams;
    }

    public SummaryProfile getSummaryProfile() {
        return profile.getSummaryProfile();
    }

    public Profile getProfile() {
        return profile;
    }

    public void setProfileType(ProfileType profileType) {
        this.profileType = profileType;
    }


    public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
        this.proxyShowResultSet = proxyShowResultSet;
    }

    public ConnectContext getContext() {
        return context;
    }

    public OriginStatement getOriginStmt() {
        return originStmt;
    }

    public String getOriginStmtInString() {
        if (originStmt != null && originStmt.originStmt != null) {
            return originStmt.originStmt;
        }
        return "";
    }

    public List<ByteBuffer> getProxyQueryResultBufList() {
        return ((ProxyMysqlChannel) context.getMysqlChannel()).getProxyResultBufferList();
    }

    public void sendProxyQueryResult() throws IOException {
        if (masterOpExecutor == null) {
            return;
        }
        List<ByteBuffer> queryResultBufList = masterOpExecutor.getQueryResultBufList();
        for (ByteBuffer byteBuffer : queryResultBufList) {
            context.getMysqlChannel().sendOnePacket(byteBuffer);
        }
    }

    public String getPrepareStmtName() {
        return this.prepareStmtName;
    }
}