ConnectProcessor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.parser.SqlDialectHelper;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.proto.Data;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
/**
* Process one connection, the life cycle is the same as connection
*/
public abstract class ConnectProcessor {
public enum ConnectType {
MYSQL,
ARROW_FLIGHT_SQL
}
private static final Logger LOG = LogManager.getLogger(ConnectProcessor.class);
protected final ConnectContext ctx;
protected StmtExecutor executor = null;
protected ConnectType connectType;
protected ArrayList<StmtExecutor> returnResultFromRemoteExecutor = new ArrayList<>();
public ConnectProcessor(ConnectContext context) {
this.ctx = context;
}
public ConnectContext getConnectContext() {
return ctx;
}
public boolean isHandleQueryInFe() {
return executor.isHandleQueryInFe();
}
// change current database of this session.
protected void handleInitDb(String fullDbName) {
String catalogName = null;
String dbName = null;
String[] dbNames = fullDbName.split("\\.");
if (dbNames.length == 1) {
dbName = fullDbName;
} else if (dbNames.length == 2) {
catalogName = dbNames[0];
dbName = dbNames[1];
} else if (dbNames.length > 2) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "Only one dot can be in the name: " + fullDbName);
return;
}
// mysql client
if (Config.isCloudMode()) {
try {
dbName = ((CloudEnv) ctx.getEnv()).analyzeCloudCluster(dbName, ctx);
} catch (DdlException e) {
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
return;
}
if (dbName == null || dbName.isEmpty()) {
return;
}
}
// check catalog and db exists
if (catalogName != null) {
CatalogIf catalogIf = ctx.getEnv().getCatalogMgr().getCatalog(catalogName);
if (catalogIf == null) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR,
ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(catalogName + "." + dbName));
return;
}
if (catalogIf.getDbNullable(dbName) == null) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR,
ErrorCode.ERR_BAD_DB_ERROR.formatErrorMsg(catalogName + "." + dbName));
return;
}
}
try {
if (catalogName != null) {
ctx.getEnv().changeCatalog(ctx, catalogName);
}
ctx.getEnv().changeDb(ctx, dbName);
} catch (DdlException e) {
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
return;
} catch (Throwable t) {
ctx.getState().setError(ErrorCode.ERR_INTERNAL_ERROR, Util.getRootCauseMessage(t));
return;
}
ctx.getState().setOk();
}
// set killed flag
protected void handleQuit() {
ctx.setKilled();
ctx.getState().setOk();
}
// do nothing
protected void handlePing() {
ctx.getState().setOk();
}
// Do nothing for now.
protected void handleStatistics() {
ctx.getState().setOk();
}
// Do nothing for now.
protected void handleDebug() {
ctx.getState().setOk();
}
protected void handleResetConnection() {
ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME);
ctx.clearLastDBOfCatalog();
ctx.getState().setOk();
}
protected void handleStmtReset() {
ctx.getState().setOk();
}
protected void handleStmtClose(int stmtId) {
if (LOG.isDebugEnabled()) {
LOG.debug("close stmt id: {}", stmtId);
}
ConnectContext.get().removePrepareStmt(String.valueOf(stmtId));
// No response packet is sent back to the client, see
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_close.html
ctx.getState().setNoop();
}
protected static boolean isNull(byte[] bitmap, int position) {
return (bitmap[position / 8] & (1 << (position & 7))) != 0;
}
protected void auditAfterExec(String origStmt, StatementBase parsedStmt,
Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
if (Config.enable_bdbje_debug_mode) {
return;
}
AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, printFuzzyVariables);
}
// only throw an exception when there is a problem interacting with the requesting client
protected void handleQuery(String originStmt) throws ConnectionException {
if (Config.isCloudMode()) {
if (!ctx.getCurrentUserIdentity().isRootUser()
&& ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getInstanceStatus()
== Cloud.InstanceInfoPB.Status.OVERDUE) {
Exception exception = new Exception("warehouse is overdue!");
try {
handleQueryException(exception, originStmt, null, null);
} catch (ConnectionException ignore) {
// ignore, should not happen
}
return;
}
}
try {
executeQuery(originStmt);
} catch (ConnectionException exception) {
throw exception;
} catch (UserException exception) {
LOG.warn("execute query exception", exception);
} catch (Exception ignored) {
// saved use handleQueryException
}
}
public void executeQuery(String originStmt) throws Exception {
if (MetricRepo.isInit && !ctx.getSessionVariable().internalSession) {
MetricRepo.COUNTER_REQUEST_ALL.increase(1L);
if (Config.isCloudMode()) {
try {
MetricRepo.increaseClusterRequestAll(ctx.getCloudCluster(false));
} catch (ComputeGroupException e) {
LOG.warn("metrics get cluster exception", e);
}
}
}
String convertedStmt = SqlDialectHelper.convertSqlByDialect(originStmt, ctx.getSessionVariable());
String sqlHash = DigestUtils.md5Hex(convertedStmt);
ctx.setSqlHash(sqlHash);
SessionVariable sessionVariable = ctx.getSessionVariable();
boolean wantToParseSqlFromSqlCache = CacheAnalyzer.canUseSqlCache(sessionVariable);
List<StatementBase> stmts = null;
long parseSqlStartTime = System.currentTimeMillis();
List<StatementBase> cachedStmts = null;
CacheKeyType cacheKeyType = null;
if (wantToParseSqlFromSqlCache) {
cachedStmts = parseFromSqlCache(originStmt);
Optional<SqlCacheContext> sqlCacheContext = ConnectContext.get()
.getStatementContext().getSqlCacheContext();
if (sqlCacheContext.isPresent()) {
cacheKeyType = sqlCacheContext.get().getCacheKeyType();
}
if (cachedStmts != null) {
stmts = cachedStmts;
}
}
if (stmts == null) {
try {
stmts = new NereidsParser().parseSQL(convertedStmt, sessionVariable);
} catch (NotSupportedException e) {
// Parse sql failed, audit it and return
handleQueryException(e, convertedStmt, null, null);
return;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Nereids parse sql failed. Reason: {}. Statement: \"{}\".",
e.getMessage(), convertedStmt, e);
}
Throwable exception = new AnalysisException(e.getMessage(), e);
handleQueryException(exception, originStmt, null, null);
return;
}
}
List<String> origSingleStmtList = null;
// if stmts.size() > 1, split originStmt to multi singleStmts
if (stmts.size() > 1) {
try {
origSingleStmtList = SqlUtils.splitMultiStmts(convertedStmt);
} catch (Exception ignore) {
LOG.warn("Try to parse multi origSingleStmt failed, originStmt: \"{}\"", convertedStmt);
}
}
long parseSqlFinishTime = System.currentTimeMillis();
boolean usingOrigSingleStmt = origSingleStmtList != null && origSingleStmtList.size() == stmts.size();
for (int i = 0; i < stmts.size(); ++i) {
String auditStmt = usingOrigSingleStmt ? origSingleStmtList.get(i) : convertedStmt;
if (stmts.size() > 1 && usingOrigSingleStmt) {
ctx.setSqlHash(DigestUtils.md5Hex(auditStmt));
}
try {
ctx.getState().reset();
if (i > 0) {
ctx.resetReturnRows();
}
StatementBase parsedStmt = stmts.get(i);
parsedStmt.setOrigStmt(new OriginStatement(auditStmt, usingOrigSingleStmt ? 0 : i));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
executor = new StmtExecutor(ctx, parsedStmt);
executor.getProfile().getSummaryProfile().setParseSqlStartTime(parseSqlStartTime);
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
ctx.setExecutor(executor);
if (cacheKeyType != null) {
SqlCacheContext sqlCacheContext =
executor.getContext().getStatementContext().getSqlCacheContext().get();
sqlCacheContext.setCacheKeyType(cacheKeyType);
}
try {
executor.execute();
if (connectType.equals(ConnectType.MYSQL)) {
if (i != stmts.size() - 1) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
if (ctx.getState().getStateType() != MysqlStateType.ERR) {
// here, doris do different with mysql.
// when client not request CLIENT_MULTI_STATEMENTS, mysql treat all query as
// single statement. Doris treat it with multi statement, but only return
// the last statement result.
if (getConnectContext().getMysqlChannel().clientMultiStatements()) {
finalizeCommand();
}
}
}
} else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
if (!ctx.isReturnResultFromLocal()) {
returnResultFromRemoteExecutor.add(executor);
}
Preconditions.checkState(ctx.getFlightSqlChannel().resultNum() <= 1);
if (ctx.getFlightSqlChannel().resultNum() == 1 && i != stmts.size() - 1) {
String errMsg = "Only be one stmt that returns the result and it is at the end. "
+ "stmts.size(): " + stmts.size();
LOG.warn(errMsg);
ctx.getState().setError(ErrorCode.ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT, errMsg);
ctx.getState().setErrType(QueryState.ErrType.OTHER_ERR);
break;
}
}
auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(),
true);
// execute failed, skip remaining stmts
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
break;
}
} catch (Throwable throwable) {
handleQueryException(throwable, auditStmt, executor.getParsedStmt(),
executor.getQueryStatisticsForAuditLog());
// execute failed, skip remaining stmts
throw throwable;
}
} finally {
StatementContext statementContext = ctx.getStatementContext();
if (statementContext != null) {
statementContext.close();
}
}
}
}
private List<StatementBase> parseFromSqlCache(String originStmt) {
StatementContext statementContext = new StatementContext(ctx, new OriginStatement(originStmt, 0));
ctx.setStatementContext(statementContext);
// the mysql protocol has different between COM_QUERY and COM_STMT_EXECUTE,
// the sql cache use the result of COM_QUERY, so we can not provide the
// result of sql cache for COM_STMT_EXECUTE/COM_STMT_PREPARE
switch (ctx.getCommand()) {
case COM_STMT_EXECUTE:
case COM_STMT_PREPARE:
return null;
default: { }
}
try {
Optional<Pair<ExplainOptions, String>> explainPlan = NereidsParser.tryParseExplainPlan(originStmt);
String cacheSqlKey = originStmt;
if (explainPlan.isPresent()) {
cacheSqlKey = explainPlan.get().second;
}
Env env = ctx.getEnv();
Optional<LogicalSqlCache> sqlCachePlanOpt = env.getSqlCacheManager().tryParseSql(ctx, cacheSqlKey);
if (sqlCachePlanOpt.isPresent()) {
LogicalSqlCache logicalSqlCache = sqlCachePlanOpt.get();
LogicalPlan parsedPlan = logicalSqlCache;
if (explainPlan.isPresent()) {
ExplainOptions explainOptions = explainPlan.get().first;
parsedPlan = new ExplainCommand(
explainOptions.getExplainLevel(),
parsedPlan,
explainOptions.showPlanProcess()
);
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(parsedPlan, statementContext);
logicalPlanAdapter.setColLabels(
Lists.newArrayList(logicalSqlCache.getColumnLabels())
);
logicalPlanAdapter.setFieldInfos(Lists.newArrayList(logicalSqlCache.getFieldInfos()));
logicalPlanAdapter.setResultExprs(logicalSqlCache.getResultExprs());
logicalPlanAdapter.setOrigStmt(statementContext.getOriginStatement());
logicalPlanAdapter.setUserInfo(ctx.getCurrentUserIdentity());
return ImmutableList.of(logicalPlanAdapter);
}
} catch (Throwable t) {
LOG.warn("Parse from sql cache failed: " + t.getMessage(), t);
} finally {
statementContext.releasePlannerResources();
}
return null;
}
// Use a handler for exception to avoid big try catch block which is a little hard to understand
protected void handleQueryException(Throwable throwable, String origStmt,
StatementBase parsedStmt, Data.PQueryStatistics statistics) throws ConnectionException {
if (ctx.getMinidump() != null) {
MinidumpUtils.saveMinidumpString(ctx.getMinidump(), DebugUtil.printId(ctx.queryId()));
}
if (throwable instanceof ConnectionException) {
// Throw this exception to close the connection outside.
LOG.warn("Process one query failed because ConnectionException: ", throwable);
throw (ConnectionException) throwable;
} else if (throwable instanceof IOException) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", throwable);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed: " + throwable.getMessage());
} else if (throwable instanceof UserException) {
LOG.warn("Process one query failed because.", throwable);
ctx.getState().setError(((UserException) throwable).getMysqlErrorCode(), throwable.getMessage());
// set it as ANALYSIS_ERR so that it won't be treated as a query failure.
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} else if (throwable instanceof NotSupportedException) {
LOG.warn("Process one query failed because.", throwable);
ctx.getState().setError(ErrorCode.ERR_NOT_SUPPORTED_YET, throwable.getMessage());
// set it as ANALYSIS_ERR so that it won't be treated as a query failure.
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} else {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", throwable);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
throwable.getClass().getSimpleName() + ", msg: " + throwable.getMessage());
if (parsedStmt instanceof KillStmt) {
// ignore kill stmt execute err(not monitor it)
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
}
}
auditAfterExec(origStmt, parsedStmt, statistics, true);
}
// Get the column definitions of a table
@SuppressWarnings("rawtypes")
protected void handleFieldList(String tableName) throws ConnectionException {
// Already get command code.
if (Strings.isNullOrEmpty(tableName)) {
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty tableName");
return;
}
DatabaseIf db = ctx.getCurrentCatalog().getDbNullable(ctx.getDatabase());
if (db == null) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "Unknown database(" + ctx.getDatabase() + ")");
return;
}
TableIf table = db.getTableNullable(tableName);
if (table == null) {
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Unknown table(" + tableName + ")");
return;
}
table.readLock();
try {
if (connectType.equals(ConnectType.MYSQL)) {
MysqlChannel channel = ctx.getMysqlChannel();
MysqlSerializer serializer = channel.getSerializer();
// Send fields
// NOTE: Field list doesn't send number of fields
List<Column> baseSchema = table.getBaseSchema();
for (Column column : baseSchema) {
serializer.reset();
serializer.writeField(db.getFullName(), table.getName(), column, true);
channel.sendOnePacket(serializer.toByteBuffer());
}
} else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
// TODO
}
} catch (Throwable throwable) {
handleQueryException(throwable, "", null, null);
} finally {
table.readUnlock();
}
ctx.getState().setEof();
}
// only Mysql protocol
protected ByteBuffer getResultPacket() {
Preconditions.checkState(connectType.equals(ConnectType.MYSQL));
MysqlPacket packet = ctx.getState().toResponsePacket();
if (packet == null) {
// possible two cases:
// 1. handler has send request
// 2. this command need not to send response
return null;
}
MysqlSerializer serializer = ctx.getMysqlChannel().getSerializer();
serializer.reset();
packet.writeTo(serializer);
return serializer.toByteBuffer();
}
// When any request is completed, it will generally need to send a response packet to the client
// This method is used to send a response packet to the client
// only Mysql protocol
public void finalizeCommand() throws IOException {
LOG.debug("Finalize command for query {}", DebugUtil.printId(ctx.queryId));
Preconditions.checkState(connectType.equals(ConnectType.MYSQL));
ByteBuffer packet;
if (executor != null && executor.isForwardToMaster()
&& ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
ShowResultSet resultSet = executor.getShowResultSet();
if (resultSet == null) {
executor.sendProxyQueryResult();
packet = executor.getOutputPacket();
} else {
executor.sendResultSet(resultSet);
packet = getResultPacket();
}
} else {
packet = getResultPacket();
}
if (packet == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("packet == null");
}
return;
}
LOG.debug("Send to mysql channel for query {}", DebugUtil.printId(ctx.queryId));
MysqlChannel channel = ctx.getMysqlChannel();
channel.sendAndFlush(packet);
// note(wb) we should write profile after return result to mysql client
// because write profile maybe take too much time
// explain query stmt do not have profile
if (executor != null && executor.getParsedStmt() != null && !executor.getParsedStmt().isExplain()
&& (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile
|| executor.getParsedStmt() instanceof LogicalPlanAdapter
|| executor.getParsedStmt() instanceof InsertStmt)) {
executor.updateProfile(true);
StatsErrorEstimator statsErrorEstimator = ConnectContext.get().getStatsErrorEstimator();
if (statsErrorEstimator != null) {
statsErrorEstimator.updateProfile(ConnectContext.get().queryId());
}
}
LOG.debug("End finalizing command for query {}", DebugUtil.printId(ctx.queryId));
}
public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException {
ctx.setDatabase(request.db);
ctx.setQualifiedUser(request.user);
ctx.setEnv(Env.getCurrentEnv());
ctx.getState().reset();
if (request.isSetUserIp()) {
ctx.setRemoteIP(request.getUserIp());
}
if (request.isSetStmtId()) {
ctx.setForwardedStmtId(request.getStmtId());
}
if (request.isSetCurrentUserIdent()) {
UserIdentity currentUserIdentity = UserIdentity.fromThrift(request.getCurrentUserIdent());
ctx.setCurrentUserIdentity(currentUserIdentity);
}
if (request.isFoldConstantByBe()) {
ctx.getSessionVariable().setEnableFoldConstantByBe(request.foldConstantByBe);
}
if (request.isSetSessionVariables()) {
ctx.getSessionVariable().setForwardedSessionVariables(request.getSessionVariables());
}
if (request.isSetUserVariables()) {
ctx.setUserVars(userVariableFromThrift(request.getUserVariables()));
}
// set compute group
ctx.setComputeGroup(Env.getCurrentEnv().getAuth().getComputeGroup(ctx.qualifiedUser));
ctx.setThreadLocalInfo();
StmtExecutor executor = null;
try {
// 0 for compatibility.
int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0;
executor = new StmtExecutor(ctx, new OriginStatement(request.getSql(), idx), true);
ctx.setExecutor(executor);
// Set default catalog only if the catalog exists.
if (request.isSetDefaultCatalog()) {
CatalogIf catalog = ctx.getEnv().getCatalogMgr().getCatalog(request.getDefaultCatalog());
if (catalog != null) {
ctx.getEnv().changeCatalog(ctx, request.getDefaultCatalog());
// Set default db only when the default catalog is set and the dbname exists in default catalog.
if (request.isSetDefaultDatabase()) {
DatabaseIf db = ctx.getCurrentCatalog().getDbNullable(request.getDefaultDatabase());
if (db != null) {
ctx.getEnv().changeDb(ctx, request.getDefaultDatabase());
}
}
}
}
// set transaction entry for transaction load
if (request.isSetTxnLoadInfo()) {
TransactionEntry transactionEntry = new TransactionEntry();
transactionEntry.setTxnInfoInMaster(request.getTxnLoadInfo());
ctx.setTxnEntry(transactionEntry);
}
TUniqueId queryId; // This query id will be set in ctx
if (request.isSetQueryId()) {
queryId = request.getQueryId();
} else {
UUID uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
}
if (request.isSetPrepareExecuteBuffer()) {
ctx.setCommand(MysqlCommand.COM_STMT_PREPARE);
executor.execute();
ctx.setCommand(MysqlCommand.COM_STMT_EXECUTE);
String preparedStmtId = executor.getPrepareStmtName();
PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(preparedStmtId);
if (preparedStatementContext == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Something error, just support nereids preparedStmtId:{}", preparedStmtId);
}
throw new RuntimeException("Prepare failed when proxy execute");
}
handleExecute(preparedStatementContext.command, Long.parseLong(preparedStmtId),
preparedStatementContext,
ByteBuffer.wrap(request.getPrepareExecuteBuffer()).order(ByteOrder.LITTLE_ENDIAN), queryId);
} else {
executor.queryRetry(queryId);
}
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed: " + e.getMessage());
} catch (Throwable e) {
// Catch all throwable.
// If reach here, maybe Doris bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
}
// no matter the master execute success or fail, the master must transfer the result to follower
// and tell the follower the current journalID.
TMasterOpResult result = new TMasterOpResult();
if (ctx.queryId() != null
// If none master FE not set query id or query id was reset in StmtExecutor
// when a query exec more than once, return it to none master FE.
&& (!request.isSetQueryId() || !request.getQueryId().equals(ctx.queryId()))
) {
result.setQueryId(ctx.queryId());
}
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
result.setPacket(getResultPacket());
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
result.setStatusCode(0);
} else {
ErrorCode errorCode = ctx.getState().getErrorCode();
if (errorCode != null) {
result.setStatusCode(errorCode.getCode());
} else {
result.setStatusCode(ErrorCode.ERR_UNKNOWN_ERROR.getCode());
}
result.setErrMessage(ctx.getState().getErrorMessage());
}
if (request.isSetTxnLoadInfo()) {
TransactionEntry transactionEntry = ConnectContext.get().getTxnEntry();
// null if this is a commit or rollback command
if (transactionEntry != null) {
result.setTxnLoadInfo(transactionEntry.getTxnInfoInMaster());
}
}
if (executor != null) {
if (executor.getProxyShowResultSet() != null) {
result.setResultSet(executor.getProxyShowResultSet().tothrift());
} else if (!executor.getProxyQueryResultBufList().isEmpty()) {
result.setQueryResultBufList(executor.getProxyQueryResultBufList());
}
}
return result;
}
// only Mysql protocol
public void processOnce() throws IOException, NotImplementedException {
throw new NotImplementedException("Not Impl processOnce");
}
private Map<String, LiteralExpr> userVariableFromThrift(Map<String, TExprNode> thriftMap) throws TException {
try {
Map<String, LiteralExpr> userVariables = Maps.newHashMap();
for (Map.Entry<String, TExprNode> entry : thriftMap.entrySet()) {
TExprNode tExprNode = entry.getValue();
LiteralExpr literalExpr = LiteralExpr.getLiteralExprFromThrift(tExprNode);
userVariables.put(entry.getKey(), literalExpr);
}
return userVariables;
} catch (AnalysisException e) {
throw new TException(e.getMessage());
}
}
protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
ByteBuffer packetBuf, TUniqueId queryId) {
throw new NotSupportedException("Just MysqlConnectProcessor support execute");
}
}