NativeInsertStmt.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExportSink;
import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Insert into is performed to load data from the result of query stmt.
* <p>
* syntax:
* INSERT INTO table_name [partition_info] [col_list] [plan_hints] query_stmt
* <p>
* table_name: is the name of target table
* partition_info: PARTITION (p1,p2)
* the partition info of target table
* col_list: (c1,c2)
* the column list of target table
* plan_hints: [STREAMING,SHUFFLE_HINT]
* The streaming plan is used by both streaming and non-streaming insert stmt.
* The only difference is that non-streaming will record the load info in LoadManager and return label.
* User can check the load info by show load stmt.
*/
@Deprecated
public class NativeInsertStmt extends InsertStmt {
private static final Logger LOG = LogManager.getLogger(InsertStmt.class);
private static final String SHUFFLE_HINT = "SHUFFLE";
private static final String NOSHUFFLE_HINT = "NOSHUFFLE";
protected final TableName tblName;
private final PartitionNames targetPartitionNames;
// parsed from targetPartitionNames.
private List<Long> targetPartitionIds;
protected List<String> targetColumnNames;
private QueryStmt queryStmt;
private final List<String> planHints;
private Boolean isRepartition;
// set after parse all columns and expr in query statement
// this result expr in the order of target table's columns
private final List<Expr> resultExprs = Lists.newArrayList();
private final Map<String, Expr> exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
protected Table targetTable;
private DatabaseIf db;
private long transactionId;
// we need a new TupleDesc for olap table.
private TupleDescriptor olapTuple;
private DataSink dataSink;
private DataPartition dataPartition;
private final List<Column> targetColumns = Lists.newArrayList();
/*
* InsertStmt may be analyzed twice, but transaction must be only begun once.
* So use a boolean to check if transaction already begun.
*/
private boolean isTransactionBegin = false;
private boolean isValuesOrConstantSelect;
private boolean isPartialUpdate = false;
private HashSet<String> partialUpdateCols = new HashSet<String>();
// Used for group commit insert
private boolean isGroupCommit = false;
private TUniqueId loadId = null;
private long tableId = -1;
public boolean isGroupCommitStreamLoadSql = false;
private GroupCommitPlanner groupCommitPlanner;
private boolean reuseGroupCommitPlan = false;
private boolean isFromDeleteOrUpdateStmt = false;
private InsertType insertType = InsertType.NATIVE_INSERT;
boolean hasEmptyTargetColumns = false;
private boolean allowAutoPartition = true;
enum InsertType {
NATIVE_INSERT("insert_"),
UPDATE("update_"),
DELETE("delete_");
private String labePrefix;
InsertType(String labePrefix) {
this.labePrefix = labePrefix;
}
}
public NativeInsertStmt(NativeInsertStmt other) {
super(other.label, null, null);
this.tblName = other.tblName;
this.targetPartitionNames = other.targetPartitionNames;
this.label = other.label;
this.queryStmt = other.queryStmt;
this.planHints = other.planHints;
this.targetColumnNames = other.targetColumnNames;
this.isValuesOrConstantSelect = other.isValuesOrConstantSelect;
}
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
super(new LabelName(null, label), null, null);
this.tblName = target.getTblName();
this.targetPartitionNames = target.getPartitionNames();
this.label = new LabelName(null, label);
this.queryStmt = source.getQueryStmt();
this.planHints = hints;
this.targetColumnNames = cols;
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
}
// Ctor of group commit in sql parser
public NativeInsertStmt(long tableId, String label, List<String> cols, InsertSource source,
List<String> hints) {
this(new InsertTarget(new TableName(null, null, null), null), label, cols, source, hints);
this.tableId = tableId;
}
// Ctor for CreateTableAsSelectStmt and InsertOverwriteTableStmt
public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, LabelName label,
QueryStmt queryStmt, List<String> planHints, List<String> targetColumnNames, boolean allowAutoPartition) {
super(label, null, null);
this.tblName = name;
this.targetPartitionNames = targetPartitionNames;
this.queryStmt = queryStmt;
this.planHints = planHints;
this.targetColumnNames = targetColumnNames;
this.allowAutoPartition = allowAutoPartition;
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
}
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints, boolean isPartialUpdate, InsertType insertType) {
this(target, label, cols, source, hints);
this.isPartialUpdate = isPartialUpdate;
this.partialUpdateCols.addAll(cols);
this.insertType = insertType;
}
public boolean isValuesOrConstantSelect() {
return isValuesOrConstantSelect;
}
public Table getTargetTable() {
return targetTable;
}
public void setTargetTable(Table targetTable) {
this.targetTable = targetTable;
}
public long getTransactionId() {
return this.transactionId;
}
public Boolean isRepartition() {
return isRepartition;
}
public String getDbName() {
return tblName.getDb();
}
public String getTbl() {
return tblName.getTbl();
}
public List<String> getTargetColumnNames() {
return targetColumnNames;
}
public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String> parentViewNameSet)
throws AnalysisException {
if (tableId != -1) {
TableIf table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
Preconditions.checkState(table instanceof OlapTable);
OlapTable olapTable = (OlapTable) table;
tblName.setDb(olapTable.getDatabase().getFullName());
tblName.setTbl(olapTable.getName());
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS || olapTable.getTableProperty().storeRowColumn()) {
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(true));
targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList());
}
}
// get dbs of statement
queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
tblName.analyze(analyzer);
// disallow external catalog except JdbcExternalCatalog
if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
&& !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) {
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
}
String dbName = tblName.getDb();
String tableName = tblName.getTbl();
String ctlName = tblName.getCtl();
// check exist
DatabaseIf db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
// check access
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), ctlName, dbName, tableName, PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
ctlName + ": " + dbName + ": " + tableName);
}
tableMap.put(table.getId(), table);
}
public QueryStmt getQueryStmt() {
return queryStmt;
}
public void setQueryStmt(QueryStmt queryStmt) {
this.queryStmt = queryStmt;
}
public boolean isExplain() {
return queryStmt.isExplain();
}
public String getLabel() {
return label.getLabelName();
}
public DataSink getDataSink() {
return dataSink;
}
public DatabaseIf getDbObj() {
return db;
}
public boolean isTransactionBegin() {
return isTransactionBegin;
}
protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (targetTable == null) {
tblName.analyze(analyzer);
// disallow external catalog except JdbcExternalCatalog
if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
&& !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) {
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
}
}
// Check privilege
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), tblName.getCtl(), tblName.getDb(),
tblName.getTbl(), PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
tblName.getCtl() + ": " + tblName.getDb() + ": " + tblName.getTbl());
}
// check partition
if (targetPartitionNames != null) {
targetPartitionNames.analyze(analyzer);
}
}
/**
* translate load related stmt to`insert into xx select xx from tvf` semantic
*/
protected void convertSemantic(Analyzer analyzer) throws UserException {
// do nothing
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
preCheckAnalyze(analyzer);
convertSemantic(analyzer);
// set target table and
analyzeTargetTable(analyzer);
db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
analyzeGroupCommit(analyzer);
if (isGroupCommit()) {
return;
}
analyzeSubquery(analyzer, false);
analyzePlanHints();
if (analyzer.getContext().isTxnModel()) {
return;
}
// create data sink
createDataSink();
// create label and begin transaction
long timeoutSecond = ConnectContext.get().getExecTimeoutS();
if (label == null || Strings.isNullOrEmpty(label.getLabelName())) {
label = new LabelName(db.getFullName(),
insertType.labePrefix + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"));
}
if (!isExplain() && !isTransactionBegin && !isGroupCommitStreamLoadSql) {
if (targetTable instanceof OlapTable) {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label.getLabelName(),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;
}
// init data sink
if (!isExplain() && targetTable instanceof OlapTable) {
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = analyzer.getContext().queryId();
int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism();
boolean isInsertStrict = analyzer.getContext().getSessionVariable().getEnableInsertStrict()
&& !isFromDeleteOrUpdateStmt;
sink.init(loadId, transactionId, db.getId(), timeoutSecond,
sendBatchParallelism, false, isInsertStrict, timeoutSecond);
}
}
protected void initTargetTable(Analyzer analyzer) throws AnalysisException {
if (targetTable == null) {
DatabaseIf db = analyzer.getEnv().getCatalogMgr()
.getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
if (db instanceof Database) {
targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl());
} else if (db instanceof JdbcExternalDatabase) {
JdbcExternalTable jdbcTable = (JdbcExternalTable) db.getTableOrAnalysisException(tblName.getTbl());
targetTable = jdbcTable.getJdbcTable();
} else {
throw new AnalysisException("Not support insert target table.");
}
}
}
private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
// Get table
initTargetTable(analyzer);
if (targetTable instanceof OlapTable) {
OlapTable olapTable = (OlapTable) targetTable;
// partition
if (targetPartitionNames != null) {
targetPartitionIds = Lists.newArrayList();
if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
for (String partName : targetPartitionNames.getPartitionNames()) {
Partition part = olapTable.getPartition(partName, targetPartitionNames.isTemp());
if (part == null) {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_UNKNOWN_PARTITION, partName, targetTable.getName());
}
targetPartitionIds.add(part.getId());
}
}
// For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP),
// user MUST specify the sequence column while inserting data
//
// case1: create table by `function_column.sequence_col`
// a) insert with column list, must include the sequence map column
// b) insert without column list, already contains the column, don't need to check
// case2: create table by `function_column.sequence_type`
// a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__
// b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__
// by default, will fail.
if (olapTable.hasSequenceCol()) {
boolean haveInputSeqCol = false;
Optional<Column> seqColInTable = Optional.empty();
if (olapTable.getSequenceMapCol() != null) {
if (targetColumnNames != null) {
if (targetColumnNames.stream()
.anyMatch(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol()))) {
haveInputSeqCol = true; // case1.a
}
} else {
haveInputSeqCol = true; // case1.b
}
seqColInTable = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equalsIgnoreCase(olapTable.getSequenceMapCol())).findFirst();
} else {
if (targetColumnNames != null) {
if (targetColumnNames.stream()
.anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) {
haveInputSeqCol = true; // case2.a
} // else case2.b
}
}
if (!haveInputSeqCol && !isPartialUpdate && !isFromDeleteOrUpdateStmt
&& !analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()
&& analyzer.getContext().getSessionVariable().isRequireSequenceInInsert()) {
if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null
|| !seqColInTable.get().getDefaultValue()
.equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) {
throw new AnalysisException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}
}
if (isPartialUpdate && olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.stream().anyMatch(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol()))) {
partialUpdateCols.add(Column.SEQUENCE_COL);
}
// need a descriptor
DescriptorTable descTable = analyzer.getDescTbl();
olapTuple = descTable.createTupleDescriptor();
for (Column col : olapTable.getFullSchema()) {
if (isPartialUpdate && partialUpdateCols.stream().noneMatch(c -> c.equalsIgnoreCase(col.getName()))) {
continue;
}
SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
slotDesc.setAutoInc(col.isAutoInc());
}
} else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable
|| targetTable instanceof JdbcTable) {
if (targetPartitionNames != null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
} else if (targetTable instanceof BrokerTable) {
if (targetPartitionNames != null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
}
BrokerTable brokerTable = (BrokerTable) targetTable;
if (!brokerTable.isWritable()) {
throw new AnalysisException("table " + brokerTable.getName()
+ "is not writable. path should be an dir");
}
} else {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_NON_INSERTABLE_TABLE, targetTable.getName(), targetTable.getType());
}
}
private void checkColumnCoverage(Set<String> mentionedCols, List<Column> baseColumns)
throws AnalysisException {
// check columns of target table
for (Column col : baseColumns) {
if (col.isAutoInc()) {
continue;
}
if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) {
continue;
}
if (mentionedCols.contains(col.getName())) {
continue;
}
if (col.getDefaultValue() == null && !col.isAllowNull()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NOT_MENTIONED, col.getName());
}
}
}
private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserException {
// Analyze columns mentioned in the statement.
Set<String> mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
if (targetColumnNames == null) {
hasEmptyTargetColumns = true;
// the mentioned columns are columns which are visible to user, so here we use
// getBaseSchema(), not getFullSchema()
for (Column col : targetTable.getBaseSchema(false)) {
mentionedColumns.add(col.getName());
targetColumns.add(col);
}
} else {
for (String colName : targetColumnNames) {
Column col = targetTable.getColumn(colName);
if (col == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, colName, targetTable.getName());
}
if (!mentionedColumns.add(colName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_FIELD_SPECIFIED_TWICE, colName);
}
targetColumns.add(col);
}
// hll column must in mentionedColumns
for (Column col : targetTable.getBaseSchema()) {
if (col.getType().isObjectStored() && !col.hasDefaultValue()
&& !mentionedColumns.contains(col.getName())) {
throw new AnalysisException(
"object-stored column " + col.getName() + " must in insert into columns");
}
}
}
/*
* When doing schema change, there may be some shadow columns. we should add
* them to the end of targetColumns. And use 'origColIdxsForExtendCols' to save
* the index of column in 'targetColumns' which the shadow column related to.
* eg: origin targetColumns: (A,B,C), shadow column: __doris_shadow_B after
* processing, targetColumns: (A, B, C, __doris_shadow_B), and
* origColIdxsForExtendCols has 1 element: "1", which is the index of column B
* in targetColumns.
*
* Rule A: If the column which the shadow column related to is not mentioned,
* then do not add the shadow column to targetColumns. They will be filled by
* null or default value when loading.
*
* When table have materialized view, there may be some materialized view columns.
* we should add them to the end of targetColumns.
* eg: origin targetColumns: (A,B,C), shadow column: mv_bitmap_union_C
* after processing, targetColumns: (A, B, C, mv_bitmap_union_C), and
* origColIdx2MVColumn has 1 element: "2, mv_bitmap_union_C"
* will be used in as a mapping from queryStmt.getResultExprs() to targetColumns define expr
*/
List<Pair<Integer, Column>> origColIdxsForExtendCols = Lists.newArrayList();
if (!ConnectContext.get().isTxnModel()) {
for (Column column : targetTable.getFullSchema()) {
if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
String origName = Column.removeNamePrefix(column.getName());
for (int i = 0; i < targetColumns.size(); i++) {
if (targetColumns.get(i).nameEquals(origName, false)) {
// Rule A
origColIdxsForExtendCols.add(Pair.of(i, null));
targetColumns.add(column);
break;
}
}
}
if (column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX)
|| column.isNameWithPrefix(
CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX)) {
List<SlotRef> refColumns = column.getRefColumns();
if (refColumns == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR,
column.getName(), targetTable.getName());
}
for (SlotRef refColumn : refColumns) {
String origName = refColumn.getColumnName();
for (int originColumnIdx = 0; originColumnIdx < targetColumns.size(); originColumnIdx++) {
if (targetColumns.get(originColumnIdx).nameEquals(origName, false)) {
origColIdxsForExtendCols.add(Pair.of(originColumnIdx, column));
targetColumns.add(column);
break;
}
}
}
}
}
}
// parse query statement
queryStmt.setFromInsert(true);
queryStmt.analyze(analyzer);
// deal with this case: insert into tbl values();
// should try to insert default values for all columns in tbl if set
if (isValuesOrConstantSelect) {
final ValueList valueList = ((SelectStmt) queryStmt).getValueList();
if (valueList != null && valueList.getFirstRow().isEmpty() && CollectionUtils.isEmpty(targetColumnNames)) {
final int rowSize = mentionedColumns.size();
final List<String> colLabels = queryStmt.getColLabels();
final List<Expr> resultExprs = queryStmt.getResultExprs();
Preconditions.checkState(resultExprs.isEmpty(), "result exprs should be empty.");
for (int i = 0; i < rowSize; i++) {
resultExprs.add(new StringLiteral(SelectStmt.DEFAULT_VALUE));
final DefaultValueExpr defaultValueExpr = new DefaultValueExpr();
valueList.getFirstRow().add(defaultValueExpr);
colLabels.add(defaultValueExpr.toColumnLabel());
}
}
}
if (analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
trySetPartialUpdate();
}
// check if size of select item equal with columns mentioned in statement
if (mentionedColumns.size() != queryStmt.getResultExprs().size()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT);
}
// Check if all columns mentioned is enough
// For JdbcTable, it is allowed to insert without specifying all columns and without checking
if (!(targetTable instanceof JdbcTable)) {
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());
}
List<String> realTargetColumnNames = targetColumns.stream().map(Column::getName).collect(Collectors.toList());
// handle VALUES() or SELECT constant list
if (isValuesOrConstantSelect) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getValueList() != null) {
// INSERT INTO VALUES(...)
List<ArrayList<Expr>> rows = selectStmt.getValueList().getRows();
for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
// Only check for JdbcTable
if (targetTable instanceof JdbcTable) {
// Check for NULL values in not-nullable columns
for (int colIdx = 0; colIdx < targetColumns.size(); ++colIdx) {
Column column = targetColumns.get(colIdx);
// Ensure rows.get(rowIdx) has enough columns to match targetColumns
if (colIdx < rows.get(rowIdx).size()) {
Expr expr = rows.get(rowIdx).get(colIdx);
if (!column.isAllowNull() && expr instanceof NullLiteral) {
throw new AnalysisException("Column `" + column.getName()
+ "` is not nullable, but the inserted value is nullable.");
}
}
}
}
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, realTargetColumnNames,
skipCheck);
}
// clear these 2 structures, rebuild them using VALUES exprs
selectStmt.getResultExprs().clear();
selectStmt.getBaseTblResultExprs().clear();
for (int i = 0; i < selectStmt.getValueList().getFirstRow().size(); ++i) {
selectStmt.getResultExprs().add(selectStmt.getValueList().getFirstRow().get(i));
selectStmt.getBaseTblResultExprs().add(selectStmt.getValueList().getFirstRow().get(i));
}
} else {
// INSERT INTO SELECT 1,2,3 ...
List<ArrayList<Expr>> rows = Lists.newArrayList();
// ATTN: must copy the `selectStmt.getResultExprs()`, otherwise the following
// `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing
// error.
rows.add(Lists.newArrayList(selectStmt.getResultExprs()));
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, realTargetColumnNames,
skipCheck);
// rows may be changed in analyzeRow(), so rebuild the result exprs
selectStmt.getResultExprs().clear();
// For JdbcTable, need to check whether there is a NULL value inserted into the NOT NULL column
if (targetTable instanceof JdbcTable) {
for (int colIdx = 0; colIdx < targetColumns.size(); ++colIdx) {
Column column = targetColumns.get(colIdx);
Expr expr = rows.get(0).get(colIdx);
if (!column.isAllowNull() && expr instanceof NullLiteral) {
throw new AnalysisException("Column `" + column.getName()
+ "` is not nullable, but the inserted value is nullable.");
}
}
}
for (Expr expr : rows.get(0)) {
selectStmt.getResultExprs().add(expr);
}
}
} else {
// INSERT INTO SELECT ... FROM tbl
if (!origColIdxsForExtendCols.isEmpty()) {
// extend the result expr by duplicating the related exprs
Map<String, Expr> slotToIndex = buildSlotToIndex(queryStmt.getResultExprs(), realTargetColumnNames,
analyzer);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
} else {
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs()
.add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
Expr e = entry.second.getDefineExpr().clone(smap);
e.analyze(analyzer);
queryStmt.getResultExprs().add(e);
}
}
}
// check compatibility
for (int i = 0; i < targetColumns.size(); ++i) {
Column column = targetColumns.get(i);
Expr expr = queryStmt.getResultExprs().get(i);
queryStmt.getResultExprs().set(i, expr.checkTypeCompatibility(column.getType()));
}
}
// expand colLabels in QueryStmt
if (!origColIdxsForExtendCols.isEmpty()) {
if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) {
Map<String, Expr> slotToIndex = buildSlotToIndex(queryStmt.getBaseTblResultExprs(),
realTargetColumnNames, analyzer);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
} else {
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs()
.add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
Expr e = entry.second.getDefineExpr().clone(smap);
e.analyze(analyzer);
queryStmt.getBaseTblResultExprs().add(e);
}
}
}
if (queryStmt.getResultExprs().size() != queryStmt.getColLabels().size()) {
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
queryStmt.getColLabels().add(queryStmt.getColLabels().get(entry.first));
}
}
}
if (LOG.isDebugEnabled()) {
for (Expr expr : queryStmt.getResultExprs()) {
if (LOG.isDebugEnabled()) {
LOG.debug("final result expr: {}, {}", expr, System.identityHashCode(expr));
}
}
for (Expr expr : queryStmt.getBaseTblResultExprs()) {
if (LOG.isDebugEnabled()) {
LOG.debug("final base table result expr: {}, {}", expr, System.identityHashCode(expr));
}
}
for (String colLabel : queryStmt.getColLabels()) {
if (LOG.isDebugEnabled()) {
LOG.debug("final col label: {}", colLabel);
}
}
}
}
private Map<String, Expr> buildSlotToIndex(ArrayList<Expr> row, List<String> realTargetColumnNames,
Analyzer analyzer) throws AnalysisException {
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < row.size(); i++) {
Expr expr = row.get(i);
expr.analyze(analyzer);
if (expr instanceof DefaultValueExpr || expr instanceof StringLiteral
&& ((StringLiteral) expr).getValue().equals(SelectStmt.DEFAULT_VALUE)) {
continue;
}
expr.analyze(analyzer);
slotToIndex.put(realTargetColumnNames.get(i),
expr.checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType()));
}
for (Column column : targetTable.getBaseSchema()) {
if (!slotToIndex.containsKey(column.getName())) {
if (column.getDefaultValue() == null) {
slotToIndex.put(column.getName(), new NullLiteral());
} else {
slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue()));
}
}
}
return slotToIndex;
}
private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows, int rowIdx,
List<Pair<Integer, Column>> origColIdxsForExtendCols, List<String> realTargetColumnNames, boolean skipCheck)
throws AnalysisException {
// 1. check number of fields if equal with first row
// targetColumns contains some shadow columns, which is added by system,
// so we should minus this
if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) {
throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1));
}
if (skipCheck) {
return;
}
ArrayList<Expr> row = rows.get(rowIdx);
Map<String, Expr> slotToIndex = buildSlotToIndex(row, realTargetColumnNames, analyzer);
if (!origColIdxsForExtendCols.isEmpty()) {
/**
* we should extend the row for shadow columns.
* eg:
* the origin row has exprs: (expr1, expr2, expr3), and targetColumns is (A, B, C, __doris_shadow_b)
* after processing, extentedRow is (expr1, expr2, expr3, expr2)
*/
ArrayList<Expr> extentedRow = Lists.newArrayList();
extentedRow.addAll(row);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry != null) {
if (entry.second == null) {
extentedRow.add(extentedRow.get(entry.first));
} else {
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs()
.add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0));
}
}
}
row = extentedRow;
rows.set(rowIdx, row);
}
// check the compatibility of expr in row and column in targetColumns
for (int i = 0; i < row.size(); ++i) {
Expr expr = row.get(i);
Column col = targetColumns.get(i);
if (expr instanceof DefaultValueExpr) {
if (targetColumns.get(i).getDefaultValue() == null && !targetColumns.get(i).isAllowNull()
&& !targetColumns.get(i).isAutoInc()) {
throw new AnalysisException("Column has no default value, column="
+ targetColumns.get(i).getName());
}
if (targetColumns.get(i).getDefaultValue() == null) {
expr = new NullLiteral();
} else {
expr = new StringLiteral(targetColumns.get(i).getDefaultValue());
}
}
if (expr instanceof Subquery) {
throw new AnalysisException("Insert values can not be query");
}
expr.analyze(analyzer);
row.set(i, expr.checkTypeCompatibility(col.getType()));
}
}
private void analyzePlanHints() throws AnalysisException {
if (planHints == null) {
return;
}
for (String hint : planHints) {
if (SHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitionedTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && !isRepartition) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint);
}
isRepartition = Boolean.TRUE;
} else if (NOSHUFFLE_HINT.equalsIgnoreCase(hint)) {
if (!targetTable.isPartitionedTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_INSERT_HINT_NOT_SUPPORT);
}
if (isRepartition != null && isRepartition) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_PLAN_HINT_CONFILT, hint);
}
isRepartition = Boolean.FALSE;
} else {
ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PLAN_HINT, hint);
}
}
}
public void prepareExpressions() throws UserException {
List<Expr> selectList = Expr.cloneList(queryStmt.getResultExprs());
// check type compatibility
int numCols = targetColumns.size();
for (int i = 0; i < numCols; ++i) {
Column col = targetColumns.get(i);
exprByName.put(col.getName(), selectList.get(i));
}
List<Pair<String, Expr>> resultExprByName = Lists.newArrayList();
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
// reorder resultExprs in table column order
for (Column col : targetTable.getFullSchema()) {
if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) {
continue;
}
Expr targetExpr;
if (exprByName.containsKey(col.getName())) {
targetExpr = exprByName.get(col.getName());
} else if (targetTable.getType().equals(TableIf.TableType.JDBC_EXTERNAL_TABLE)) {
// For JdbcTable,we do not need to generate plans for columns that are not specified at write time
continue;
} else {
// process sequence col, map sequence column to other column
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()
&& col.getName().equals(Column.SEQUENCE_COL)
&& ((OlapTable) targetTable).getSequenceMapCol() != null) {
if (resultExprByName.stream().map(Pair::key)
.anyMatch(key -> key.equalsIgnoreCase(((OlapTable) targetTable).getSequenceMapCol()))) {
resultExprByName.add(Pair.of(Column.SEQUENCE_COL,
resultExprByName.stream()
.filter(p -> p.key()
.equalsIgnoreCase(((OlapTable) targetTable).getSequenceMapCol()))
.map(Pair::value).findFirst().orElse(null)));
}
continue;
} else if (col.getDefineExpr() != null) {
targetExpr = col.getDefineExpr().clone();
} else if (col.getDefaultValue() == null) {
targetExpr = NullLiteral.create(col.getType());
} else {
if (col.getDefaultValueExprDef() != null) {
targetExpr = col.getDefaultValueExpr();
} else {
StringLiteral defaultValueExpr;
defaultValueExpr = new StringLiteral(col.getDefaultValue());
targetExpr = defaultValueExpr.checkTypeCompatibility(col.getType());
}
}
}
List<SlotRef> columns = col.getRefColumns();
if (columns != null) {
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
targetExpr = targetExpr.clone(smap);
targetExpr.analyze(analyzer);
}
resultExprByName.add(Pair.of(col.getName(), targetExpr));
slotToIndex.put(col.getName(), targetExpr);
}
resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList()));
}
private DataSink createDataSink() throws AnalysisException {
if (dataSink != null) {
return dataSink;
}
if (targetTable instanceof OlapTable) {
OlapTableSink sink;
final boolean enableSingleReplicaLoad =
analyzer.getContext().getSessionVariable().isEnableMemtableOnSinkNode()
? false : analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert();
if (isGroupCommitStreamLoadSql) {
sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple,
targetPartitionIds, enableSingleReplicaLoad,
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
} else {
sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
enableSingleReplicaLoad);
}
dataSink = sink;
sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
dataPartition = dataSink.getOutputPartition();
} else if (targetTable instanceof BrokerTable) {
BrokerTable table = (BrokerTable) targetTable;
// TODO(lingbin): think use which one if have more than one path
// Map<String, String> brokerProperties = Maps.newHashMap();
// BrokerDesc brokerDesc = new BrokerDesc("test_broker", brokerProperties);
BrokerDesc brokerDesc = new BrokerDesc(table.getBrokerName(), table.getBrokerProperties());
dataSink = new ExportSink(
table.getWritablePath(),
table.getColumnSeparator(),
table.getLineDelimiter(),
brokerDesc);
dataPartition = dataSink.getOutputPartition();
} else if (targetTable instanceof JdbcTable) {
// For JdbcTable, reorder targetColumns to match the order in targetTable.getFullSchema()
List<String> insertCols = new ArrayList<>();
Set<String> targetColumnNames = targetColumns.stream()
.map(Column::getName)
.collect(Collectors.toSet());
for (Column column : targetTable.getFullSchema()) {
if (targetColumnNames.contains(column.getName())) {
insertCols.add(column.getName());
}
}
dataSink = new JdbcTableSink((JdbcTable) targetTable, insertCols);
dataPartition = DataPartition.UNPARTITIONED;
} else {
dataSink = DataSink.createDataSink(targetTable);
dataPartition = DataPartition.UNPARTITIONED;
}
return dataSink;
}
public void complete() throws UserException {
if (!isExplain() && targetTable instanceof OlapTable) {
((OlapTableSink) dataSink).complete(analyzer);
if (!allowAutoPartition) {
((OlapTableSink) dataSink).setAutoPartition(false);
}
if (!isGroupCommitStreamLoadSql) {
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), transactionId);
if (txnState == null) {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
}
}
public DataPartition getDataPartition() {
return dataPartition;
}
@Override
public List<? extends DataDesc> getDataDescList() {
throw new UnsupportedOperationException("only invoked for external load currently");
}
@Override
public ResourceDesc getResourceDesc() {
throw new UnsupportedOperationException("only invoked for external load currently");
}
@Override
public LoadType getLoadType() {
return LoadType.NATIVE_INSERT;
}
@Override
public NativeInsertStmt getNativeInsertStmt() {
return this;
}
@Override
public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
queryStmt.rewriteExprs(rewriter);
}
@Override
public void foldConstant(ExprRewriter rewriter, TQueryOptions tQueryOptions) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
queryStmt.foldConstant(rewriter, tQueryOptions);
}
@Override
public void rewriteElementAtToSlot(ExprRewriter rewriter, TQueryOptions tQueryOptions) throws AnalysisException {
Preconditions.checkState(isAnalyzed());
queryStmt.rewriteElementAtToSlot(rewriter, tQueryOptions);
}
@Override
public List<Expr> getResultExprs() {
return resultExprs;
}
@Override
public void reset() {
super.reset();
if (targetPartitionIds != null) {
targetPartitionIds.clear();
}
queryStmt.reset();
resultExprs.clear();
exprByName.clear();
dataSink = null;
dataPartition = null;
targetColumns.clear();
}
public void resetPrepare() {
label = null;
isTransactionBegin = false;
}
@Override
public RedirectStatus getRedirectStatus() {
if (isExplain() || isGroupCommit() || (ConnectContext.get() != null && ConnectContext.get().isTxnModel())) {
return RedirectStatus.NO_FORWARD;
} else {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}
public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException {
// check if http stream meets group commit requirements.
// If not meets, throw exception (consider fallback to non group commit mode).
if (isGroupCommitStreamLoadSql) {
if (targetTable != null && (targetTable instanceof OlapTable)
&& !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) {
throw new AnalysisException(
"table light_schema_change is false, can't do http_stream with group commit mode");
}
return;
}
// check if 'insert into' meets group commit requirements. If meets, set isGroupCommit to true
if (isGroupCommit) {
return;
}
try {
tblName.analyze(analyzer);
initTargetTable(analyzer);
} catch (Throwable e) {
LOG.warn("analyze group commit failed", e);
return;
}
ConnectContext ctx = ConnectContext.get();
List<Pair<BooleanSupplier, Supplier<String>>> conditions = new ArrayList<>();
conditions.add(Pair.of(() -> ctx.getSessionVariable().isEnableInsertGroupCommit(),
() -> "group_commit session variable: " + ctx.getSessionVariable().groupCommit));
conditions.add(Pair.of(() -> !isExplain(), () -> "isExplain"));
conditions.add(Pair.of(() -> !ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate(),
() -> "enableUniqueKeyPartialUpdate"));
conditions.add(Pair.of(() -> !ctx.isTxnModel(), () -> "isTxnModel"));
conditions.add(Pair.of(() -> targetTable instanceof OlapTable,
() -> "not olapTable, class: " + targetTable.getClass().getName()));
conditions.add(Pair.of(() -> ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange(),
() -> "notUseSchemaLightChange"));
conditions.add(Pair.of(() -> !targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME),
() -> "db is internal"));
conditions.add(
Pair.of(() -> targetPartitionNames == null, () -> "targetPartitionNames: " + targetPartitionNames));
conditions.add(Pair.of(() -> ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES,
() -> "sqlMode: " + ctx.getSessionVariable().getSqlMode()));
conditions.add(Pair.of(() -> queryStmt instanceof SelectStmt,
() -> "queryStmt is not SelectStmt, class: " + queryStmt.getClass().getName()));
conditions.add(Pair.of(() -> ((SelectStmt) queryStmt).getTableRefs().isEmpty(),
() -> "tableRefs is not empty: " + ((SelectStmt) queryStmt).getTableRefs()));
conditions.add(
Pair.of(() -> (label == null || Strings.isNullOrEmpty(label.getLabelName())), () -> "label: " + label));
conditions.add(
Pair.of(() -> (analyzer == null || analyzer != null && !analyzer.isReAnalyze()), () -> "analyzer"));
boolean match = conditions.stream().allMatch(p -> p.first.getAsBoolean());
if (match) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (selectStmt.getValueList() != null) {
for (List<Expr> row : selectStmt.getValueList().getRows()) {
for (Expr expr : row) {
if (!(expr instanceof LiteralExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("group commit is off for query_id: {}, table: {}, "
+ "because not literal expr: {}, row: {}",
DebugUtil.printId(ctx.queryId()), targetTable.getName(), expr, row);
}
return;
}
}
}
// Does not support: insert into tbl values();
if (selectStmt.getValueList().getFirstRow().isEmpty() && CollectionUtils.isEmpty(targetColumnNames)) {
if (LOG.isDebugEnabled()) {
LOG.debug("group commit is off for query_id: {}, table: {}, because first row: {}, "
+ "target columns: {}", DebugUtil.printId(ctx.queryId()), targetTable.getName(),
selectStmt.getValueList().getFirstRow(), targetColumnNames);
}
return;
}
} else {
SelectList selectList = selectStmt.getSelectList();
if (selectList != null) {
List<SelectListItem> items = selectList.getItems();
if (items != null) {
for (SelectListItem item : items) {
if (item.getExpr() != null && !(item.getExpr() instanceof LiteralExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("group commit is off for query_id: {}, table: {}, "
+ "because not literal expr: {}, row: {}",
DebugUtil.printId(ctx.queryId()), targetTable.getName(), item.getExpr(),
item);
}
return;
}
}
}
}
}
isGroupCommit = true;
} else {
if (LOG.isDebugEnabled()) {
for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) {
if (pair.first.getAsBoolean() == false) {
LOG.debug("group commit is off for query_id: {}, table: {}, because: {}",
DebugUtil.printId(ctx.queryId()), targetTable.getName(), pair.second.get());
break;
}
}
}
}
}
public boolean isGroupCommit() {
return isGroupCommit;
}
public boolean isReuseGroupCommitPlan() {
return reuseGroupCommitPlan;
}
public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws UserException, TException {
OlapTable olapTable = (OlapTable) getTargetTable();
olapTable.readLock();
try {
if (groupCommitPlanner != null
&& olapTable.getBaseSchemaVersion() == groupCommitPlanner.baseSchemaVersion) {
if (LOG.isDebugEnabled()) {
LOG.debug("reuse group commit plan, table={}", olapTable);
}
reuseGroupCommitPlan = true;
return groupCommitPlanner;
}
reuseGroupCommitPlan = false;
if (!targetColumns.isEmpty()) {
Analyzer analyzerTmp = analyzer;
reset();
this.analyzer = analyzerTmp;
}
analyzeSubquery(analyzer, true);
groupCommitPlanner = EnvFactory.getInstance().createGroupCommitPlanner((Database) db, olapTable,
targetColumnNames, queryId, ConnectContext.get().getSessionVariable().getGroupCommit());
// save plan message to be reused for prepare stmt
loadId = queryId;
return groupCommitPlanner;
} finally {
olapTable.readUnlock();
}
}
public TUniqueId getLoadId() {
return loadId;
}
public int getBaseSchemaVersion() {
return groupCommitPlanner.baseSchemaVersion;
}
public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) {
this.isFromDeleteOrUpdateStmt = isFromDeleteOrUpdateStmt;
}
private void trySetPartialUpdate() throws UserException {
if (isFromDeleteOrUpdateStmt || isPartialUpdate || !(targetTable instanceof OlapTable)) {
return;
}
OlapTable olapTable = (OlapTable) targetTable;
if (olapTable.getKeysType() != KeysType.UNIQUE_KEYS || olapTable.isUniqKeyMergeOnWriteWithClusterKeys()) {
return;
}
// when enable_unique_key_partial_update = true,
// only unique table with MOW insert with target columns can consider be a partial update,
// and unique table without MOW, insert will be like a normal insert.
// when enable_unique_key_partial_update = false,
// unique table with MOW, insert will be a normal insert, and column that not set will insert default value.
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
return;
}
if (hasEmptyTargetColumns) {
return;
}
boolean hasSyncMaterializedView = olapTable.getFullSchema().stream()
.anyMatch(col -> col.isMaterializedViewColumn());
if (hasSyncMaterializedView) {
throw new UserException("Can't do partial update on merge-on-write Unique table"
+ " with sync materialized view.");
}
boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : targetColumns) {
if (insertCol.getName() != null && insertCol.getName().equalsIgnoreCase(col.getName())) {
if (!col.isVisible() && !Column.DELETE_SIGN.equals(col.getName())) {
throw new UserException("Partial update should not include invisible column except"
+ " delete sign column: " + col.getName());
}
exists = true;
break;
}
}
if (!exists) {
if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
}
if (!hasMissingColExceptAutoIncKey) {
return;
}
isPartialUpdate = true;
for (String name : targetColumnNames) {
Column column = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equalsIgnoreCase(name)).findFirst().get();
partialUpdateCols.add(column.getName());
}
if (isPartialUpdate && olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);
}
// we should re-generate olapTuple
DescriptorTable descTable = analyzer.getDescTbl();
olapTuple = descTable.createTupleDescriptor();
for (Column col : olapTable.getFullSchema()) {
if (!partialUpdateCols.contains(col.getName())) {
continue;
}
SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
}
}
public boolean containTargetColumnName(String columnName) {
return targetColumnNames != null && targetColumnNames.stream()
.anyMatch(col -> col.equalsIgnoreCase(columnName));
}
}