GroupCommitPlanner.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.NativeInsertStmt;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.load.NereidsStreamLoadPlanner;
import org.apache.doris.nereids.load.NereidsStreamLoadTask;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.PreparedStatementContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
// Used to generate a plan fragment for a group commit
// we only support OlapTable now.
public class GroupCommitPlanner {
private static final Logger LOG = LogManager.getLogger(GroupCommitPlanner.class);
public static final String SCHEMA_CHANGE = " is blocked on schema change";
private static final int MAX_RETRY = 3;
private Database db;
private OlapTable table;
public int baseSchemaVersion;
private int targetColumnSize;
private TUniqueId loadId;
private long backendId;
private ByteString execPlanFragmentParamsBytes;
public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColumnNames, TUniqueId queryId,
String groupCommit)
throws UserException, TException {
this.db = db;
this.table = table;
this.baseSchemaVersion = table.getBaseSchemaVersion();
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
String msg = "insert table " + this.table.getId() + SCHEMA_CHANGE;
LOG.info(msg);
throw new DdlException(msg);
}
TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest();
if (targetColumnNames != null) {
streamLoadPutRequest.setColumns("`" + String.join("`,`", targetColumnNames) + "`");
if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
}
}
streamLoadPutRequest
.setDb(db.getFullName())
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0
: ConnectContext.get().getSessionVariable().insertMaxFilterRatio)
.setTbl(table.getName())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
.setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit)
.setStrictMode(ConnectContext.get().getSessionVariable().enableInsertStrict);
NereidsStreamLoadTask streamLoadTask = NereidsStreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner(db, table, streamLoadTask);
// Will using load id as query id in fragment
// TODO support pipeline
TPipelineFragmentParams tRequest = planner.plan(streamLoadTask.getId());
for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0)
.per_node_scan_ranges.entrySet()) {
for (TScanRangeParams scanRangeParams : entry.getValue()) {
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
TFileFormatType.FORMAT_PROTO);
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
TFileCompressType.PLAIN);
}
}
List<TScanRangeParams> scanRangeParams = tRequest.local_params.get(0).per_node_scan_ranges.values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
Preconditions.checkState(scanRangeParams.size() == 1);
loadId = queryId;
// see BackendServiceProxy#execPlanFragmentsAsync
TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList();
paramsList.addToParamsList(tRequest);
execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList));
}
public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
List<InternalService.PDataRow> rows)
throws DdlException, RpcException, ExecutionException, InterruptedException, LoadException {
Backend backend = Env.getCurrentEnv().getGroupCommitManager().selectBackendForGroupCommit(table.getId(), ctx);
backendId = backend.getId();
PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder()
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
.setRequest(execPlanFragmentParamsBytes)
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_3).build())
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
.build()).addAllData(rows)
.build();
LOG.info("query_id={}, rows={}, reuse group commit query_id={} ", DebugUtil.printId(ctx.queryId()),
rows.size(), DebugUtil.printId(loadId));
Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance()
.groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
return future.get();
}
public long getBackendId() {
return backendId;
}
public List<InternalService.PDataRow> getRows(NativeInsertStmt stmt) throws UserException {
List<InternalService.PDataRow> rows = new ArrayList<>();
SelectStmt selectStmt = (SelectStmt) (stmt.getQueryStmt());
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
rows.add(getOneRow(row));
}
} else {
List<Expr> exprList = new ArrayList<>();
for (Expr resultExpr : selectStmt.getResultExprs()) {
if (resultExpr instanceof SlotRef) {
exprList.add(((SlotRef) resultExpr).getDesc().getSourceExprs().get(0));
} else {
exprList.add(resultExpr);
}
}
rows.add(getOneRow(exprList));
}
return rows;
}
private static InternalService.PDataRow getOneRow(List<Expr> row) throws UserException {
InternalService.PDataRow data = StmtExecutor.getRowStringValue(row, FormatOptions.getDefault());
if (LOG.isDebugEnabled()) {
LOG.debug("add row: [{}]", data.getColList().stream().map(c -> c.getValue())
.collect(Collectors.joining(",")));
}
return data;
}
private static List<InternalService.PDataRow> getRows(int targetColumnSize, List<Expr> rows) throws UserException {
List<InternalService.PDataRow> data = new ArrayList<>();
for (int i = 0; i < rows.size(); i += targetColumnSize) {
List<Expr> row = rows.subList(i, Math.min(i + targetColumnSize, rows.size()));
data.add(getOneRow(row));
}
return data;
}
// prepare command
public static void executeGroupCommitInsert(ConnectContext ctx, PreparedStatementContext preparedStmtCtx,
StatementContext statementContext) throws Exception {
PrepareCommand prepareCommand = preparedStmtCtx.command;
InsertIntoTableCommand command = (InsertIntoTableCommand) (prepareCommand.getLogicalPlan());
OlapTable table = (OlapTable) command.getTable(ctx);
for (int retry = 0; retry < MAX_RETRY; retry++) {
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
String msg = "insert table " + table.getId() + SCHEMA_CHANGE;
LOG.info(msg);
throw new DdlException(msg);
}
boolean reuse = false;
GroupCommitPlanner groupCommitPlanner;
if (preparedStmtCtx.groupCommitPlanner.isPresent()
&& table.getId() == preparedStmtCtx.groupCommitPlanner.get().table.getId()
&& table.getBaseSchemaVersion() == preparedStmtCtx.groupCommitPlanner.get().baseSchemaVersion) {
groupCommitPlanner = preparedStmtCtx.groupCommitPlanner.get();
reuse = true;
} else {
// call nereids planner to check to sql
command.initPlan(ctx, new StmtExecutor(new ConnectContext(), ""), false);
List<String> targetColumnNames = command.getTargetColumns();
groupCommitPlanner = EnvFactory.getInstance()
.createGroupCommitPlanner((Database) table.getDatabase(), table,
targetColumnNames, ctx.queryId(),
ConnectContext.get().getSessionVariable().getGroupCommit());
// TODO use planner column size
groupCommitPlanner.targetColumnSize = targetColumnNames == null ? table.getBaseSchema().size() :
targetColumnNames.size();
preparedStmtCtx.groupCommitPlanner = Optional.of(groupCommitPlanner);
}
if (statementContext.getIdToPlaceholderRealExpr().size() % groupCommitPlanner.targetColumnSize != 0) {
throw new DdlException("Column size: " + statementContext.getIdToPlaceholderRealExpr().size()
+ " does not match with target column size: " + groupCommitPlanner.targetColumnSize);
}
List<Expr> valueExprs = statementContext.getIdToPlaceholderRealExpr().values().stream()
.map(v -> ((Literal) v).toLegacyLiteral()).collect(Collectors.toList());
List<InternalService.PDataRow> rows = getRows(groupCommitPlanner.targetColumnSize, valueExprs);
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
Pair<Boolean, Boolean> needRetryAndReplan = groupCommitPlanner.handleResponse(ctx, retry + 1 < MAX_RETRY,
reuse, response);
if (needRetryAndReplan.first) {
if (needRetryAndReplan.second) {
preparedStmtCtx.groupCommitPlanner = Optional.empty();
}
} else {
break;
}
}
}
// return <need_retry, need_replan>
private Pair<Boolean, Boolean> handleResponse(ConnectContext ctx, boolean canRetry, boolean reuse,
PGroupCommitInsertResponse response) throws DdlException {
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList();
if (canRetry && code != TStatusCode.OK && !errorMsgsList.isEmpty()) {
if (errorMsgsList.get(0).contains("schema version not match")) {
LOG.info("group commit insert failed. query: {}, db: {}, table: {}, schema version: {}, "
+ "backend: {}, status: {}", DebugUtil.printId(ctx.queryId()), db.getId(),
table.getId(), baseSchemaVersion, backendId, response.getStatus());
return Pair.of(true, true);
} else if (errorMsgsList.get(0).contains("can not get a block queue")) {
return Pair.of(true, false);
}
}
if (code != TStatusCode.OK) {
handleInsertFailed(ctx, response);
} else {
setReturnInfo(ctx, reuse, response);
}
return Pair.of(false, false);
}
private void handleInsertFailed(ConnectContext ctx, PGroupCommitInsertResponse response) throws DdlException {
String errMsg = "group commit insert failed. db: " + db.getId() + ", table: " + table.getId()
+ ", query: " + DebugUtil.printId(ctx.queryId()) + ", backend: " + backendId
+ ", status: " + response.getStatus();
if (response.hasErrorUrl()) {
errMsg += ", error url: " + response.getErrorUrl();
}
ErrorReport.reportDdlException(errMsg.replaceAll("%", "%%"), ErrorCode.ERR_FAILED_WHEN_INSERT);
}
private void setReturnInfo(ConnectContext ctx, boolean reuse, PGroupCommitInsertResponse response) {
String labelName = response.getLabel();
TransactionStatus txnStatus = TransactionStatus.PREPARE;
long txnId = response.getTxnId();
long loadedRows = response.getLoadedRows();
long filteredRows = (int) response.getFilteredRows();
String errorUrl = response.getErrorUrl();
// the same as {@OlapInsertExecutor#setReturnInfo}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("', 'rows':'").append(loadedRows).append("'");
}
/*if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}*/
if (!Strings.isNullOrEmpty(errorUrl)) {
sb.append(", 'err_url':'").append(errorUrl).append("'");
}
sb.append(", 'query_id':'").append(DebugUtil.printId(ctx.queryId())).append("'");
if (reuse) {
sb.append(", 'reuse_group_commit_plan':'").append(true).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, (int) filteredRows, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, db.getFullName(), table.getName(),
txnStatus, loadedRows, (int) filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
}