ExecuteCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.PointQueryExecutor;
import org.apache.doris.qe.PreparedStatementContext;
import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.StmtExecutor;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Prepared Statement
*/
public class ExecuteCommand extends Command {
private final String stmtName;
private final PrepareCommand prepareCommand;
private final StatementContext statementContext;
public ExecuteCommand(String stmtName, PrepareCommand prepareCommand, StatementContext statementContext) {
super(PlanType.EXECUTE_COMMAND);
this.stmtName = stmtName;
this.prepareCommand = prepareCommand;
this.statementContext = statementContext;
}
public String getStmtName() {
return stmtName;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visit(this, context);
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
StatementContext statementContext = ctx.getStatementContext();
statementContext.setPrepareStage(false);
PreparedStatementContext preparedStmtCtx = ctx.getPreparedStementContext(stmtName);
if (null == preparedStmtCtx) {
throw new AnalysisException(
"prepare statement " + stmtName + " not found, maybe expired");
}
PrepareCommand prepareCommand = preparedStmtCtx.command;
LogicalPlan logicalPlan = prepareCommand.getLogicalPlan();
if (logicalPlan instanceof LogicalSqlCache) {
throw new AnalysisException("Unsupported sql cache for server prepared statement");
}
LogicalPlanAdapter planAdapter = new LogicalPlanAdapter(
logicalPlan, executor.getContext().getStatementContext());
executor.setParsedStmt(planAdapter);
// If it's not a short circuit query or schema version is different(indicates schema changed) or
// has nondeterministic functions in statement, then need to do reanalyze and plan
if (executor.getContext().getStatementContext().isShortCircuitQuery()
&& preparedStmtCtx.shortCircuitQueryContext.isPresent()
&& preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion()
== preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion && !executor.getContext()
.getStatementContext().hasNondeterministic()) {
PointQueryExecutor.directExecuteShortCircuitQuery(executor, preparedStmtCtx, statementContext);
return;
}
if (ctx.getSessionVariable().enableGroupCommitFullPrepare) {
if (preparedStmtCtx.groupCommitPlanner.isPresent()) {
OlapGroupCommitInsertExecutor.fastAnalyzeGroupCommit(ctx, prepareCommand);
} else {
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, prepareCommand);
}
if (ctx.isGroupCommit()) {
GroupCommitPlanner.executeGroupCommitInsert(ctx, preparedStmtCtx, statementContext);
return;
}
}
// execute real statement
preparedStmtCtx.shortCircuitQueryContext = Optional.empty();
statementContext.setShortCircuitQueryContext(null);
executor.execute();
if (executor.getContext().getStatementContext().isShortCircuitQuery()) {
// cache short-circuit plan
preparedStmtCtx.shortCircuitQueryContext = Optional.of(
new ShortCircuitQueryContext(executor.planner(), (Queriable) executor.getParsedStmt()));
statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get());
}
}
/**
* return the sql representation contains real expr instead of placeholders
*/
public String toSql() {
// maybe slow
List<Expression> realValueExpr = prepareCommand.getPlaceholders().stream()
.map(placeholder -> statementContext.getIdToPlaceholderRealExpr().get(placeholder.getPlaceholderId()))
.collect(Collectors.toList());
return "EXECUTE `" + stmtName + "`"
+ realValueExpr.stream().map(Expression::toSql).collect(Collectors.joining(", ", " USING ", ""));
}
@Override
public StmtType stmtType() {
return StmtType.EXECUTE;
}
}