DeleteFromCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnary;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* delete from unique key table.
*/
public class DeleteFromCommand extends Command implements ForwardWithSync, Explainable {
private static final Logger LOG = LogManager.getLogger(DeleteFromCommand.class);
protected final List<String> nameParts;
protected final String tableAlias;
protected final boolean isTempPart;
protected final List<String> partitions;
protected final LogicalPlan logicalQuery;
/**
* constructor
*/
public DeleteFromCommand(List<String> nameParts, String tableAlias,
boolean isTempPart, List<String> partitions, LogicalPlan logicalQuery) {
super(PlanType.DELETE_COMMAND);
this.nameParts = Utils.copyRequiredList(nameParts);
this.tableAlias = tableAlias;
this.isTempPart = isTempPart;
this.partitions = Utils.copyRequiredList(partitions);
this.logicalQuery = logicalQuery;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
updateSessionVariableForDelete(ctx.getSessionVariable());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
boolean originalIsSkipAuth = ctx.isSkipAuth();
// delete not need select priv
ctx.setSkipAuth(true);
try {
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
} finally {
ctx.setSkipAuth(originalIsSkipAuth);
}
executor.setPlanner(planner);
executor.checkBlockRules();
// if fe could do fold constant to get delete will do nothing for table, just return.
if (planner.getPhysicalPlan() instanceof PhysicalEmptyRelation) {
Env.getCurrentEnv()
.getDeleteHandler().processEmptyRelation(ctx.getState());
return;
}
Optional<PhysicalFilter<?>> optFilter = (planner.getPhysicalPlan()
.<PhysicalFilter<?>>collect(PhysicalFilter.class::isInstance)).stream()
.findAny();
Optional<PhysicalOlapScan> optScan = (planner.getPhysicalPlan()
.<PhysicalOlapScan>collect(PhysicalOlapScan.class::isInstance)).stream()
.findAny();
Optional<UnboundRelation> optRelation = (logicalQuery
.<UnboundRelation>collect(UnboundRelation.class::isInstance)).stream()
.findAny();
Preconditions.checkArgument(optFilter.isPresent(), "delete command must contain filter");
Preconditions.checkArgument(optScan.isPresent(), "delete command could be only used on olap table");
Preconditions.checkArgument(optRelation.isPresent(), "delete command could be only used on olap table");
PhysicalOlapScan scan = optScan.get();
UnboundRelation relation = optRelation.get();
PhysicalFilter<?> filter = optFilter.get();
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), scan.getDatabase().getCatalog().getName(),
scan.getDatabase().getFullName(),
scan.getTable().getName(), PrivPredicate.LOAD)) {
String message = ErrorCode.ERR_TABLEACCESS_DENIED_ERROR.formatErrorMsg("LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
scan.getDatabase().getFullName() + ": " + scan.getTable().getName());
throw new AnalysisException(message);
}
// predicate check
OlapTable olapTable = scan.getTable();
Set<String> columns = olapTable.getFullSchema().stream().map(Column::getName).collect(Collectors.toSet());
try {
Plan plan = planner.getPhysicalPlan();
checkSubQuery(plan);
for (Expression conjunct : filter.getConjuncts()) {
conjunct.<SlotReference>collect(SlotReference.class::isInstance)
.forEach(s -> checkColumn(columns, s, olapTable));
checkPredicate(conjunct);
}
} catch (Exception e) {
try {
new DeleteFromUsingCommand(nameParts, tableAlias, isTempPart, partitions,
logicalQuery, Optional.empty()).run(ctx, executor);
return;
} catch (Exception e2) {
throw e;
}
}
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()
&& !olapTable.getEnableMowLightDelete()) {
new DeleteFromUsingCommand(nameParts, tableAlias, isTempPart, partitions,
logicalQuery, Optional.empty()).run(ctx, executor);
return;
}
// call delete handler to process
List<Predicate> predicates = planner.getScanNodes().get(0).getConjuncts().stream()
.filter(c -> {
// filter predicate __DORIS_DELETE_SIGN__ = 0
List<Expr> slotRefs = Lists.newArrayList();
c.collect(SlotRef.class::isInstance, slotRefs);
return slotRefs.stream().map(SlotRef.class::cast)
.noneMatch(s -> Column.DELETE_SIGN.equalsIgnoreCase(s.getColumnName()));
})
.map(c -> {
if (c instanceof Predicate) {
return (Predicate) c;
} else {
throw new AnalysisException("non predicate in filter: " + c.toSql());
}
}).collect(Collectors.toList());
if (predicates.isEmpty()) {
// TODO this will delete all rows, however storage layer do not support true predicate now
// just throw exception to fallback until storage support true predicate.
throw new AnalysisException("delete all rows is forbidden temporary.");
}
ArrayList<String> partitionNames = Lists.newArrayList(relation.getPartNames());
List<Partition> selectedPartitions = getSelectedPartitions(olapTable, filter, scan, partitionNames);
Env.getCurrentEnv()
.getDeleteHandler()
.process((Database) scan.getDatabase(), scan.getTable(),
selectedPartitions, predicates, ctx.getState(), partitionNames);
}
private void updateSessionVariableForDelete(SessionVariable sessionVariable) {
sessionVariable.setIsSingleSetVar(true);
try {
// turn off forbid unknown col stats
VariableMgr.setVar(sessionVariable,
new SetVar(SessionVariable.FORBID_UNKNOWN_COLUMN_STATS, new StringLiteral("false")));
// disable eliminate not null rule
List<String> disableRules = Lists.newArrayList(
RuleType.ELIMINATE_NOT_NULL.name(), RuleType.INFER_FILTER_NOT_NULL.name());
disableRules.addAll(sessionVariable.getDisableNereidsRuleNames());
VariableMgr.setVar(sessionVariable,
new SetVar(SessionVariable.DISABLE_NEREIDS_RULES,
new StringLiteral(StringUtils.join(disableRules, ","))));
} catch (Exception e) {
throw new AnalysisException("set session variable by delete from command failed", e);
}
}
private List<Partition> getSelectedPartitions(
OlapTable olapTable, PhysicalFilter<?> filter,
PhysicalOlapScan scan,
List<String> partitionNames) {
// For un_partitioned table, return all partitions.
if (olapTable.getPartitionInfo().getType().equals(PartitionType.UNPARTITIONED)) {
return Lists.newArrayList(olapTable.getPartitions());
}
List<Slot> partitionSlots = Lists.newArrayList();
for (Column c : olapTable.getPartitionColumns()) {
Slot partitionSlot = null;
// loop search is faster than build a map
for (Slot slot : filter.getOutput()) {
if (slot.getName().equalsIgnoreCase(c.getName())) {
partitionSlot = slot;
break;
}
}
if (partitionSlot != null) {
partitionSlots.add(partitionSlot);
}
}
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, PartitionItem> idToPartitions = partitionInfo.getIdToItem(false);
Optional<SortedPartitionRanges<Long>> sortedPartitionRanges = Optional.empty();
// User specified partition is not empty.
if (partitionNames != null && !partitionNames.isEmpty()) {
Set<Long> partitionIds = partitionNames.stream()
.map(olapTable::getPartition)
.map(Partition::getId)
.collect(Collectors.toSet());
idToPartitions = idToPartitions.keySet().stream()
.filter(partitionIds::contains)
.collect(Collectors.toMap(Function.identity(), idToPartitions::get));
} else {
Optional<SortedPartitionRanges<?>> sortedPartitionRangesOpt
= Env.getCurrentEnv().getSortedPartitionsCacheManager().get(olapTable, scan);
if (sortedPartitionRangesOpt.isPresent()) {
sortedPartitionRanges = (Optional) sortedPartitionRangesOpt;
}
}
List<Long> prunedPartitions = PartitionPruner.prune(
partitionSlots, filter.getPredicate(), idToPartitions,
CascadesContext.initContext(new StatementContext(), this, PhysicalProperties.ANY),
PartitionTableType.OLAP, sortedPartitionRanges);
return prunedPartitions.stream().map(olapTable::getPartition).collect(Collectors.toList());
}
private void checkColumn(Set<String> tableColumns, SlotReference slotReference, OlapTable table) {
// 0. must slot from table
if (!slotReference.getOriginalColumn().isPresent()) {
throw new AnalysisException("");
}
Column column = slotReference.getOriginalColumn().get();
if (Column.DELETE_SIGN.equalsIgnoreCase(column.getName())) {
return;
}
// 1. shadow column
if (Column.isShadowColumn(column.getName())) {
throw new AnalysisException("Can not apply delete condition to shadow column " + column.getName());
}
// 2. table has shadow column on table related to column in predicates
String shadowName = Column.getShadowName(column.getName());
if (tableColumns.contains(shadowName)) {
throw new AnalysisException(String.format("Column '%s' is under"
+ " schema change operation. Do not allow delete operation", shadowName));
}
// 3. check column is primitive type
// TODO(Now we can not push down non-scala type like array/map/struct to storage layer because of
// predict_column in be not support non-scala type, so we just should ban this type in delete predict, when
// we delete predict_column in be we should delete this ban)
if (!column.getType().isScalarType()
|| (column.getType().isOnlyMetricType() && !column.getType().isJsonbType())) {
throw new AnalysisException(String.format("Can not apply delete condition to column type: "
+ column.getType()));
}
// 4. column should not float or double
if (slotReference.getDataType().isFloatLikeType()) {
throw new AnalysisException("Column[" + column.getName() + "] type is float or double.");
}
// 5. only contains key column if agg or mor
if (!column.isKey()) {
if (table.getKeysType() == KeysType.AGG_KEYS) {
throw new AnalysisException("delete predicate on value column only supports Unique table with"
+ " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName()
+ "] is an Aggregate table.");
} else if (table.getKeysType() == KeysType.UNIQUE_KEYS && !table.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("delete predicate on value column only supports Unique table with"
+ " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName()
+ "] is an unique table without merge-on-write.");
}
}
for (String indexName : table.getIndexNameToId().keySet()) {
MaterializedIndexMeta meta = table.getIndexMetaByIndexId(table.getIndexIdByName(indexName));
Set<String> columns = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
meta.getSchema().stream()
.map(col -> org.apache.doris.analysis.CreateMaterializedViewStmt.mvColumnBreaker(col.getName()))
.forEach(name -> columns.add(name));
if (!columns.contains(column.getName())) {
throw new AnalysisException("Column[" + column.getName() + "] not exist in index " + indexName
+ ". maybe you need drop the corresponding materialized-view.");
}
}
}
private void checkSubQuery(Plan plan) {
while (true) {
if (!(plan instanceof PhysicalDistribute
|| plan instanceof PhysicalOlapScan
|| plan instanceof PhysicalProject
|| plan instanceof PhysicalFilter)) {
throw new AnalysisException("Where clause only supports compound predicate,"
+ " binary predicate, is_null predicate or in predicate.");
}
if (plan instanceof PhysicalOlapScan) {
break;
}
plan = ((PhysicalUnary<?>) plan).child();
}
}
private void checkComparisonPredicate(ComparisonPredicate cp) {
if (!(cp.left() instanceof SlotReference)) {
throw new AnalysisException(
"Left expr of binary predicate should be column name, predicate: " + cp.toSql()
+ ", left expr type:" + cp.left().getDataType());
}
if (!(cp.right() instanceof Literal)) {
throw new AnalysisException(
"Right expr of binary predicate should be value, predicate: " + cp.toSql()
+ ", right expr type:" + cp.right().getDataType());
}
}
private void checkIsNull(IsNull isNull) {
if (!(isNull.child() instanceof SlotReference)) {
throw new AnalysisException(
"Child expr of is_null predicate should be column name, predicate: " + isNull.toSql());
}
}
private void checkInPredicate(InPredicate in) {
if (!(in.getCompareExpr() instanceof SlotReference)) {
throw new AnalysisException(
"Left expr of in predicate should be column name, predicate: " + in.toSql()
+ ", left expr type:" + in.getCompareExpr().getDataType());
}
int maxAllowedInElementNumOfDelete = Config.max_allowed_in_element_num_of_delete;
if (in.getOptions().size() > maxAllowedInElementNumOfDelete) {
throw new AnalysisException("Element num of in predicate should not be more than "
+ maxAllowedInElementNumOfDelete);
}
for (Expression option : in.getOptions()) {
if (!(option instanceof Literal)) {
throw new AnalysisException("Child of in predicate should be value, but get " + option);
}
}
}
private void checkPredicate(Expression predicate) {
if (predicate instanceof And) {
And and = (And) predicate;
and.children().forEach(child -> checkPredicate(child));
} else if (predicate instanceof ComparisonPredicate) {
checkComparisonPredicate((ComparisonPredicate) predicate);
} else if (predicate instanceof IsNull) {
checkIsNull((IsNull) predicate);
} else if (predicate instanceof Not) {
Expression child = ((Not) predicate).child();
if (child instanceof IsNull) {
checkIsNull((IsNull) child);
} else if (child instanceof ComparisonPredicate) {
checkComparisonPredicate((ComparisonPredicate) child);
} else if (child instanceof InPredicate) {
checkInPredicate((InPredicate) child);
} else {
throw new AnalysisException("Where clause only supports compound predicate,"
+ " binary predicate, is_null predicate or in predicate. But we meet "
+ child.toSql());
}
} else if (predicate instanceof InPredicate) {
checkInPredicate((InPredicate) predicate);
} else {
throw new AnalysisException("Where clause only supports compound predicate,"
+ " binary predicate, is_null predicate or in predicate. But we meet "
+ predicate.toSql());
}
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitDeleteFromCommand(this, context);
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return completeQueryPlan(ctx, logicalQuery);
}
private OlapTable getTargetTable(ConnectContext ctx) {
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, nameParts);
TableIf table = RelationUtil.getTable(qualifiedTableName, ctx.getEnv());
if (!(table instanceof OlapTable)) {
throw new AnalysisException("table must be olapTable in delete command");
}
return ((OlapTable) table);
}
/**
* for explain command
*/
public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuery) {
OlapTable targetTable = getTargetTable(ctx);
checkTargetTable(targetTable);
// add select and insert node.
List<NamedExpression> selectLists = Lists.newArrayList();
List<String> cols = Lists.newArrayList();
boolean isMow = targetTable.getEnableUniqueKeyMergeOnWrite();
String tableName = tableAlias != null ? tableAlias : targetTable.getName();
boolean hasClusterKey = targetTable.getBaseSchema().stream().anyMatch(Column::isClusterKey);
boolean hasSyncMaterializedView = false;
// currently cluster key doesn't support partial update, so we can't convert
// a delete stmt to partial update load if the table has cluster key
for (Column column : targetTable.getFullSchema()) {
if (column.isMaterializedViewColumn()) {
hasSyncMaterializedView = true;
break;
}
}
for (Column column : targetTable.getBaseSchema(true)) {
NamedExpression expr;
if (column.getName().equalsIgnoreCase(Column.DELETE_SIGN)) {
expr = new UnboundAlias(new TinyIntLiteral(((byte) 1)), Column.DELETE_SIGN);
} else if (column.getName().equalsIgnoreCase(Column.SEQUENCE_COL)
&& targetTable.getSequenceMapCol() != null) {
expr = new UnboundAlias(new UnboundSlot(tableName, targetTable.getSequenceMapCol()),
Column.SEQUENCE_COL);
} else if (column.isKey()) {
expr = new UnboundSlot(tableName, column.getName());
} else if (!isMow && (!column.isVisible() || (!column.isAllowNull() && !column.hasDefaultValue()))) {
expr = new UnboundSlot(tableName, column.getName());
} else if (hasClusterKey || hasSyncMaterializedView) {
expr = new UnboundSlot(tableName, column.getName());
} else {
continue;
}
selectLists.add(expr);
cols.add(column.getName());
}
logicalQuery = new LogicalProject<>(selectLists, logicalQuery);
boolean isPartialUpdate = isMow && !hasClusterKey && !hasSyncMaterializedView
&& cols.size() < targetTable.getColumns().size();
logicalQuery = handleCte(logicalQuery);
// make UnboundTableSink
return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(),
isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery);
}
protected LogicalPlan handleCte(LogicalPlan logicalPlan) {
return logicalPlan;
}
protected void checkTargetTable(OlapTable targetTable) {
if (targetTable.getKeysType() != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("delete command on aggregate/duplicate table is not explainable");
}
}
@Override
public StmtType stmtType() {
return StmtType.DELETE;
}
}