TableQueryPlanAction.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.rest;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DorisHttpException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TMemoryScratchSink;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TQueryPlanInfo;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTabletVersionInfo;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* This class responsible for parse the sql and generate the query plan fragment for a (only one) table{@see OlapTable}
* the related tablet maybe pruned by query planer according the `where` predicate.
*/
@RestController
public class TableQueryPlanAction extends RestBaseController {
public static final Logger LOG = LogManager.getLogger(TableQueryPlanAction.class);
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_query_plan",
method = {RequestMethod.GET, RequestMethod.POST})
public Object query_plan(
@PathVariable(value = DB_KEY) final String dbName,
@PathVariable(value = TABLE_KEY) final String tblName,
HttpServletRequest request, HttpServletResponse response) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
executeCheckPassword(request, response);
// just allocate 2 slot for top holder map
Map<String, Object> resultMap = new HashMap<>(4);
try {
String postContent = HttpUtils.getBody(request);
// may be these common validate logic should be moved to one base class
if (Strings.isNullOrEmpty(postContent)) {
return ResponseEntityBuilder.badRequest("POST body must contains [sql] root object");
}
JSONObject jsonObject = (JSONObject) JSONValue.parse(postContent);
if (jsonObject == null) {
return ResponseEntityBuilder.badRequest("malformed json: " + postContent);
}
String sql = (String) jsonObject.get("sql");
if (Strings.isNullOrEmpty(sql)) {
return ResponseEntityBuilder.badRequest("POST body must contains [sql] root object");
}
LOG.info("receive SQL statement [{}] from external service [ user [{}]] for database [{}] table [{}]",
sql, ConnectContext.get().getCurrentUserIdentity(), dbName, tblName);
String fullDbName = getFullDbName(dbName);
// check privilege for select, otherwise return HTTP 401
checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tblName, PrivPredicate.SELECT);
Table table;
try {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(fullDbName);
table = db.getTableOrMetaException(tblName, TableIf.TableType.OLAP);
} catch (MetaNotFoundException e) {
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
}
table.readLock();
try {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable() != null) {
// Disable some optimizations, since it's not fully supported
// TODO support it
ConnectContext.get().getSessionVariable().setEnableTwoPhaseReadOpt(false);
}
if (Config.isCloudMode()) { // Choose a cluster to for this query
ConnectContext.get().getCloudCluster();
}
// parse/analysis/plan the sql and acquire tablet distributions
handleQuery(ConnectContext.get(), fullDbName, tblName, sql, resultMap);
} finally {
table.readUnlock();
}
} catch (DorisHttpException e) {
// status code should conforms to HTTP semantic
resultMap.put("status", e.getCode().code());
resultMap.put("exception", e.getMessage());
} catch (Exception e) {
resultMap.put("status", "1");
resultMap.put("exception", e.getMessage());
}
return ResponseEntityBuilder.ok(resultMap);
}
/**
* process the sql syntax and return the resolved pruned tablet
*
* @param context context for analyzer
* @param sql the single table select statement
* @param result the acquired results
* @return
* @throws DorisHttpException
*/
private void handleQuery(ConnectContext context, String requestDb, String requestTable, String sql,
Map<String, Object> result) throws DorisHttpException {
List<StatementBase> stmts = null;
SessionVariable sessionVariable = context.getSessionVariable();
boolean needSetParallelResultSinkToFalse = false;
try {
try {
if (!sessionVariable.enableParallelResultSink()) {
sessionVariable.setParallelResultSink(true);
needSetParallelResultSinkToFalse = true;
}
stmts = new NereidsParser().parseSQL(sql, context.getSessionVariable());
} catch (Exception e) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage());
}
// the parsed logical statement
StatementBase query = stmts.get(0);
if (!(query instanceof LogicalPlanAdapter)) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement needed, but found [" + sql + " ]");
}
LogicalPlan parsedPlan = ((LogicalPlanAdapter) query).getLogicalPlan();
// only process select semantic
if (parsedPlan instanceof Command) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement needed, but found [" + sql + " ]");
}
if (!parsedPlan.collectToList(LogicalSubQueryAlias.class::isInstance).isEmpty()) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must not embed another statement");
}
List<UnboundRelation> unboundRelations = parsedPlan.collectToList(UnboundRelation.class::isInstance);
if (unboundRelations.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"Select statement must have only one table");
}
// check consistent http requested resource with sql referenced
// if consistent in this way, can avoid check privilege
List<String> tableQualifier = RelationUtil.getQualifierName(context,
unboundRelations.get(0).getNameParts());
if (tableQualifier.size() != 3) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"can't find table " + String.join(",", tableQualifier));
}
String dbName = tableQualifier.get(1);
String tableName = tableQualifier.get(2);
if (GlobalVariable.lowerCaseTableNames == 0) {
if (!(dbName.equals(requestDb) && tableName.equals(requestTable))) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + dbName
+ "." + tableName + "]");
}
} else {
if (!(dbName.equalsIgnoreCase(requestDb)
&& tableName.equalsIgnoreCase(requestTable))) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"requested database and table must consistent with sql: request [ "
+ requestDb + "." + requestTable + "]" + "and sql [" + dbName
+ "." + tableName + "]");
}
}
NereidsPlanner nereidsPlanner = new NereidsPlanner(context.getStatementContext());
LogicalPlan rewrittenPlan = (LogicalPlan) nereidsPlanner.planWithLock(parsedPlan,
PhysicalProperties.GATHER, ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
if (!rewrittenPlan.allMatch(planTreeNode -> planTreeNode instanceof LogicalOlapScan
|| planTreeNode instanceof LogicalFilter || planTreeNode instanceof LogicalProject
|| planTreeNode instanceof LogicalResultSink || planTreeNode instanceof LogicalEmptyRelation)) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"only support single table filter-prune-scan, but found [ " + sql + "]");
}
NereidsPlanner planner = new NereidsPlanner(context.getStatementContext());
try {
planner.plan(query, context.getSessionVariable().toThrift());
} catch (Exception ex) {
throw new DorisHttpException(HttpResponseStatus.BAD_REQUEST,
"only support single table filter-prune-scan, but found [ " + sql + "]");
}
// acquire ScanNode to obtain pruned tablet
// in this way, just retrieve only one scannode
List<ScanNode> scanNodes = planner.getScanNodes();
if (scanNodes.size() > 1) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one ScanNode but found [ " + scanNodes.size() + "]");
}
List<TScanRangeLocations> scanRangeLocations = scanNodes.size() == 1
? scanNodes.get(0).getScanRangeLocations(0) : new ArrayList<>();
// acquire the PlanFragment which the executable template
List<PlanFragment> fragments = planner.getFragments();
if (fragments.size() != 1) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"Planner should plan just only one PlanFragment but found [ " + fragments.size() + "]");
}
TQueryPlanInfo tQueryPlanInfo = new TQueryPlanInfo();
// acquire TPlanFragment
TPlanFragment tPlanFragment = fragments.get(0).toThrift();
// set up TMemoryScratchSink
TDataSink tDataSink = new TDataSink();
tDataSink.type = TDataSinkType.MEMORY_SCRATCH_SINK;
tDataSink.memory_scratch_sink = new TMemoryScratchSink();
tPlanFragment.output_sink = tDataSink;
tQueryPlanInfo.plan_fragment = tPlanFragment;
tQueryPlanInfo.desc_tbl = planner.getDescTable().toThrift();
// set query_id
UUID uuid = UUID.randomUUID();
tQueryPlanInfo.query_id = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
Map<Long, TTabletVersionInfo> tabletInfo = new HashMap<>();
// acquire resolved tablet distribution
Map<String, Node> tabletRoutings = assemblePrunedPartitions(scanRangeLocations);
tabletRoutings.forEach((tabletId, node) -> {
long tablet = Long.parseLong(tabletId);
tabletInfo.put(tablet, new TTabletVersionInfo(tablet, node.version,
0L /*version hash*/, node.schemaHash));
});
tQueryPlanInfo.tablet_info = tabletInfo;
// serialize TQueryPlanInfo and encode plan with Base64 to string in order to translate by json format
TSerializer serializer;
String opaquedQueryPlan;
try {
serializer = new TSerializer();
byte[] queryPlanStream = serializer.serialize(tQueryPlanInfo);
opaquedQueryPlan = Base64.getEncoder().encodeToString(queryPlanStream);
} catch (TException e) {
throw new DorisHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR,
"TSerializer failed to serialize PlanFragment, reason [ " + e.getMessage() + " ]");
}
result.put("partitions", tabletRoutings);
result.put("opaqued_query_plan", opaquedQueryPlan);
result.put("status", 200);
} finally {
if (needSetParallelResultSinkToFalse) {
sessionVariable.setParallelResultSink(false);
}
}
}
/**
* acquire all involved (already pruned) tablet routing
*
* @param scanRangeLocationsList
* @return
*/
private Map<String, Node> assemblePrunedPartitions(List<TScanRangeLocations> scanRangeLocationsList) {
Map<String, Node> result = new HashMap<>();
for (TScanRangeLocations scanRangeLocations : scanRangeLocationsList) {
// only process palo(doris) scan range
TPaloScanRange scanRange = scanRangeLocations.scan_range.palo_scan_range;
Node tabletRouting = new Node(Long.parseLong(scanRange.version), 0 /* schema hash is not used */);
for (TNetworkAddress address : scanRange.hosts) {
tabletRouting.addRouting(NetUtils
.getHostPortInAccessibleFormat(address.hostname, address.port));
}
result.put(String.valueOf(scanRange.tablet_id), tabletRouting);
}
return result;
}
// helper class for json transformation
final class Node {
// ["host1:port1", "host2:port2", "host3:port3"]
public List<String> routings = new ArrayList<>();
public long version;
public int schemaHash;
public Node(long version, int schemaHash) {
this.version = version;
this.schemaHash = schemaHash;
}
private void addRouting(String routing) {
routings.add(routing);
}
}
}