DeleteJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJobLifeCycle {
private static final Logger LOG = LogManager.getLogger(DeleteJob.class);
public static final String DELETE_PREFIX = "delete_";
public enum DeleteState {
UN_QUORUM,
QUORUM_FINISHED,
FINISHED
}
private DeleteState state;
// jobId(listenerId). use in beginTransaction to callback function
private final long id;
protected static final long INVALID_TXN_ID = -1L;
protected long transactionId = INVALID_TXN_ID;
protected String label;
private final Set<Long> totalTablets;
private final Set<Long> quorumTablets;
private final Set<Long> finishedTablets;
Map<Long, TabletDeleteInfo> tabletDeleteInfoMap;
private final Set<PushTask> pushTasks;
private final DeleteInfo deleteInfo;
private final Map<Long, Short> partitionReplicaNum;
private Database targetDb;
protected OlapTable targetTbl;
private List<Partition> partitions;
private List<Predicate> deleteConditions;
private MarkedCountDownLatch<Long, Long> countDownLatch;
private long timeoutS = 300L;
public DeleteJob(long id, long transactionId, String label,
Map<Long, Short> partitionReplicaNum, DeleteInfo deleteInfo) {
this.id = id;
this.transactionId = transactionId;
this.label = label;
this.deleteInfo = deleteInfo;
totalTablets = Sets.newHashSet();
finishedTablets = Sets.newHashSet();
quorumTablets = Sets.newHashSet();
tabletDeleteInfoMap = Maps.newConcurrentMap();
pushTasks = Sets.newHashSet();
state = DeleteState.UN_QUORUM;
this.partitionReplicaNum = partitionReplicaNum;
}
public static Builder newBuilder() {
return new Builder();
}
/**
* check and update if this job's state is QUORUM_FINISHED or FINISHED
* The meaning of state:
* QUORUM_FINISHED: For each tablet there are more than half of its replicas have been finished
* FINISHED: All replicas of this job have finished
*/
private void checkAndUpdateQuorum() throws MetaNotFoundException {
long dbId = deleteInfo.getDbId();
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
for (TabletDeleteInfo tDeleteInfo : getTabletDeleteInfo()) {
Short replicaNum = partitionReplicaNum.get(tDeleteInfo.getPartitionId());
if (replicaNum == null) {
// should not happen
throw new MetaNotFoundException("Unknown partition "
+ tDeleteInfo.getPartitionId() + " when commit delete job");
}
if (tDeleteInfo.getFinishedReplicas().size() == replicaNum) {
finishedTablets.add(tDeleteInfo.getTabletId());
}
if (tDeleteInfo.getFinishedReplicas().size() >= replicaNum / 2 + 1) {
quorumTablets.add(tDeleteInfo.getTabletId());
}
}
int dropCounter = 0;
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (long tabletId : totalTablets) {
if (invertedIndex.getTabletMeta(tabletId) == null) {
// tablet does not exist.
// This may happen during the delete operation, and the schema change task ends,
// causing the old tablet to be deleted.
// We think this situation is normal. In order to ensure that the delete task can end normally
// here we regard these deleted tablets as completed.
finishedTablets.add(tabletId);
dropCounter++;
LOG.warn("tablet {} has been dropped when checking delete job {}", tabletId, id);
}
}
LOG.info("check delete job quorum, transaction id: {}, total tablets: {},"
+ " quorum tablets: {}, dropped tablets: {}",
transactionId, totalTablets.size(), quorumTablets.size(), dropCounter);
if (finishedTablets.containsAll(totalTablets)) {
this.state = DeleteState.FINISHED;
} else if (quorumTablets.containsAll(totalTablets)) {
this.state = DeleteState.QUORUM_FINISHED;
}
}
public DeleteState getState() {
return this.state;
}
private void addTablet(long tabletId) {
totalTablets.add(tabletId);
}
public void addPushTask(PushTask pushTask) {
pushTasks.add(pushTask);
}
public void addFinishedReplica(long partitionId, long tabletId, Replica replica) {
tabletDeleteInfoMap.putIfAbsent(tabletId, new TabletDeleteInfo(partitionId, tabletId));
TabletDeleteInfo tDeleteInfo = tabletDeleteInfoMap.get(tabletId);
tDeleteInfo.addFinishedReplica(replica);
}
public DeleteInfo getDeleteInfo() {
return deleteInfo;
}
public String getLabel() {
return this.label;
}
@Override
public long getId() {
return this.id;
}
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
if (!txnOperated) {
return;
}
executeFinish();
Env.getCurrentEnv().getEditLog().logFinishDelete(deleteInfo);
}
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) {
// just to clean the callback
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
}
public void executeFinish() {
this.state = DeleteState.FINISHED;
Env.getCurrentEnv().getDeleteHandler().recordFinishedJob(this);
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
}
public long getTransactionId() {
return this.transactionId;
}
public Collection<TabletDeleteInfo> getTabletDeleteInfo() {
return tabletDeleteInfoMap.values();
}
public void setTimeoutS(long timeoutS) {
this.timeoutS = timeoutS;
}
public long getTimeoutMs() {
if (FeConstants.runningUnitTest) {
// for making unit test run fast
return 1000;
}
return timeoutS * 1000L;
}
public void setTargetDb(Database targetDb) {
this.targetDb = targetDb;
}
public void setTargetTbl(OlapTable targetTbl) {
this.targetTbl = targetTbl;
}
public void setPartitions(List<Partition> partitions) {
this.partitions = partitions;
}
public void setDeleteConditions(List<Predicate> deleteConditions) {
this.deleteConditions = deleteConditions;
}
public void setCountDownLatch(MarkedCountDownLatch<Long, Long> countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public long beginTxn() throws Exception {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
Lists.newArrayList(deleteInfo.getTableId()), label, null,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, id, Config.stream_load_default_timeout_second);
this.transactionId = txnId;
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
return txnId;
}
@Override
public void dispatch() throws Exception {
// task sent to be
AgentBatchTask batchTask = new AgentBatchTask();
String vaultId = targetTbl.getStorageVaultId();
for (Partition partition : partitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
long indexId = index.getId();
MaterializedIndexMeta indexMeta = targetTbl.getIndexMetaByIndexId(indexId);
int schemaVersion = indexMeta.getSchemaVersion();
int schemaHash = indexMeta.getSchemaHash();
List<TColumn> columnsDesc = Lists.newArrayList();
// using to update schema of the rowset, so full columns should be included
for (Column column : indexMeta.getSchema(true)) {
columnsDesc.add(column.toThrift());
}
Map<String, TColumn> colNameToColDesc = columnsDesc.stream()
.collect(Collectors.toMap(c -> c.getColumnName(), Function.identity(), (v1, v2) -> v1,
() -> Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER)));
for (Predicate condition : deleteConditions) {
SlotRef slotRef = (SlotRef) condition.getChild(0);
String columnName = slotRef.getColumnName();
TColumn column = colNameToColDesc.get(slotRef.getColumnName());
if (column == null) {
columnName = CreateMaterializedViewStmt.mvColumnBuilder(columnName);
column = colNameToColDesc.get(columnName);
}
if (column == null) {
if (partition.isRollupIndex(index.getId())) {
throw new AnalysisException("If MV or rollup index exists, do not support delete."
+ "Drop existing rollup or MV and try again.");
}
throw new AnalysisException(
"condition's column not founded in index, column=" + columnName + " , index=" + index);
}
}
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
// set push type
TPushType type = TPushType.DELETE;
for (Replica replica : tablet.getReplicas()) {
long replicaId = replica.getId();
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);
// create push task for each replica
// To ensure that upgrading to new signature generating algo does not conflict with the old
// signature, adding 10 billion to `getNextId`. We are confident that the old signature
// generated will not exceed this number.
PushTask pushTask = new PushTask(null,
backendId, targetDb.getId(), targetTbl.getId(),
partition.getId(), indexId,
tabletId, replicaId, schemaHash,
-1, "", -1, 0,
-1, type, deleteConditions,
true, TPriority.NORMAL,
TTaskType.REALTIME_PUSH,
transactionId,
Env.getCurrentEnv().getNextId() + 10000000000L,
columnsDesc,
vaultId, schemaVersion);
pushTask.setIsSchemaChanging(false);
pushTask.setCountDownLatch(countDownLatch);
if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
addPushTask(pushTask);
addTablet(tabletId);
}
}
}
}
}
// submit push tasks
if (batchTask.getTaskNum() > 0) {
AgentTaskExecutor.submit(batchTask);
}
}
@Override
public void await() throws Exception {
long timeoutMs = getTimeoutMs();
boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (ok) {
if (!countDownLatch.getStatus().ok()) {
// encounter some errors that don't need to retry, abort directly
LOG.warn("delete job failed, errmsg={}", countDownLatch.getStatus().getErrorMsg());
throw new UserException(String.format("delete job failed, errmsg:%s",
countDownLatch.getStatus().getErrorMsg()));
}
return;
}
//handle failure
String errMsg = "";
List<Map.Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
// only show at most 5 results
List<Map.Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5));
if (!subList.isEmpty()) {
errMsg = "unfinished replicas [BackendId=TabletId]: " + Joiner.on(", ").join(subList);
}
LOG.warn(errMsg);
checkAndUpdateQuorum();
switch (state) {
case UN_QUORUM:
LOG.warn("delete job timeout: transactionId {}, timeout {}, {}",
transactionId, timeoutMs, errMsg);
throw new UserException(String.format("delete job timeout, timeout(ms):%s, msg:%s", timeoutMs, errMsg));
case QUORUM_FINISHED:
case FINISHED:
long nowQuorumTimeMs = System.currentTimeMillis();
long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2;
// if job's state is quorum_finished then wait for a period of time and commit it.
while (state == DeleteState.QUORUM_FINISHED
&& endQuorumTimeoutMs > nowQuorumTimeMs) {
checkAndUpdateQuorum();
Thread.sleep(1000);
nowQuorumTimeMs = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("wait for quorum finished delete job: {}, txn id: {}",
id, transactionId);
}
}
break;
default:
throw new IllegalStateException("wrong delete job state: " + state.name());
}
}
protected List<TabletCommitInfo> generateTabletCommitInfos() {
TabletInvertedIndex currentInvertedIndex = Env.getCurrentInvertedIndex();
List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList();
tabletDeleteInfoMap.forEach((tabletId, deleteInfo) -> deleteInfo.getFinishedReplicas()
.forEach(replica -> {
if (currentInvertedIndex.getTabletIdByReplica(replica.getId()) == null) {
LOG.warn("could not find tablet id for replica {}, the tablet maybe dropped", replica);
return;
}
tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendIdWithoutException()));
}));
return tabletCommitInfos;
}
@Override
public String commit() throws Exception {
List<TabletCommitInfo> tabletCommitInfos = generateTabletCommitInfos();
boolean visible = Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(targetDb, Lists.newArrayList(targetTbl),
transactionId, tabletCommitInfos, getTimeoutMs());
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(label);
sb.append("', 'txnId':'").append(transactionId)
.append("', 'status':'");
if (visible) {
sb.append(TransactionStatus.VISIBLE.name()).append("'");
sb.append("}");
} else {
// Although publish is unfinished we should tell user that commit already success.
sb.append(TransactionStatus.COMMITTED.name()).append("'");
String msg = "delete job is committed but may be taking effect later";
sb.append(", 'msg':'").append(msg).append("'");
sb.append("}");
}
return sb.toString();
}
@Override
public void cancel(String reason) {
GlobalTransactionMgrIface globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
try {
globalTransactionMgr.abortTransaction(deleteInfo.getDbId(), transactionId, reason);
} catch (Exception e) {
TransactionState state = globalTransactionMgr.getTransactionState(
deleteInfo.getDbId(), transactionId);
if (state == null) {
LOG.warn("cancel delete job failed because txn not found, transactionId: {}",
transactionId);
} else if (state.getTransactionStatus() == TransactionStatus.COMMITTED
|| state.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.warn("cancel delete job failed because it has been committed, transactionId: {}",
transactionId);
} else {
LOG.warn("errors while abort transaction", e);
}
}
}
@Override
public void cleanUp() {
for (PushTask pushTask : pushTasks) {
AgentTaskQueue.removePushTask(pushTask.getBackendId(), pushTask.getSignature(),
pushTask.getVersion(),
pushTask.getPushType(), pushTask.getTaskType());
}
}
public static class BuildParams {
private final Database db;
private final OlapTable table;
private final Collection<String> partitionNames;
private final List<Predicate> deleteConditions;
private final List<Partition> selectedPartitions;
public BuildParams(Database db, OlapTable table,
Collection<String> partitionNames,
List<Predicate> deleteConditions) {
this.db = db;
this.table = table;
this.partitionNames = partitionNames;
this.deleteConditions = deleteConditions;
this.selectedPartitions = null;
}
public BuildParams(Database db, OlapTable table,
List<String> partitionNames,
List<Partition> selectedPartitions,
List<Predicate> deleteConditions) {
this.db = db;
this.table = table;
this.partitionNames = partitionNames;
this.deleteConditions = deleteConditions;
this.selectedPartitions = selectedPartitions;
}
public OlapTable getTable() {
return table;
}
public Collection<String> getPartitionNames() {
return partitionNames;
}
public Database getDb() {
return db;
}
public List<Predicate> getDeleteConditions() {
return deleteConditions;
}
public List<Partition> getSelectedPartitions() {
return selectedPartitions;
}
}
public static class Builder {
public DeleteJob buildWithNereids(BuildParams params) {
boolean noPartitionSpecified = params.getPartitionNames().isEmpty();
List<Partition> partitions = params.getSelectedPartitions();
Map<Long, Short> partitionReplicaNum = partitions.stream()
.collect(Collectors.toMap(
Partition::getId,
partition ->
params.getTable()
.getPartitionInfo()
.getReplicaAllocation(partition.getId())
.getTotalReplicaNum()));
// generate label
String label = DELETE_PREFIX + UUID.randomUUID();
//generate jobId
long jobId = Env.getCurrentEnv().getNextId();
List<String> partitionNames = partitions.stream().map(Partition::getName).collect(Collectors.toList());
List<Long> partitionIds = partitions.stream().map(Partition::getId).collect(Collectors.toList());
DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), params.getTable().getId(),
params.getTable().getName(), getDeleteCondString(params.getDeleteConditions()),
noPartitionSpecified, partitionIds, partitionNames);
DeleteJob deleteJob = ConnectContext.get() != null && ConnectContext.get().isTxnModel()
? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo)
: new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo);
long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
deleteJob.setPartitions(partitions);
deleteJob.setDeleteConditions(params.getDeleteConditions());
deleteJob.setTargetDb(params.getDb());
deleteJob.setTargetTbl(params.getTable());
deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) replicaNum));
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
deleteJob.setTimeoutS(connectContext.getExecTimeoutS());
}
return deleteJob;
}
public DeleteJob buildWith(BuildParams params) throws Exception {
boolean noPartitionSpecified = params.getPartitionNames().isEmpty();
List<Partition> partitions = getSelectedPartitions(params.getTable(), params.getPartitionNames(),
params.getDeleteConditions());
Map<Long, Short> partitionReplicaNum = partitions.stream()
.collect(Collectors.toMap(
Partition::getId,
partition ->
params.getTable()
.getPartitionInfo()
.getReplicaAllocation(partition.getId())
.getTotalReplicaNum()));
// generate label
String label = DELETE_PREFIX + UUID.randomUUID();
//generate jobId
long jobId = Env.getCurrentEnv().getNextId();
List<String> partitionNames = partitions.stream().map(Partition::getName).collect(Collectors.toList());
List<Long> partitionIds = partitions.stream().map(Partition::getId).collect(Collectors.toList());
DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), params.getTable().getId(),
params.getTable().getName(), getDeleteCondString(params.getDeleteConditions()),
noPartitionSpecified, partitionIds, partitionNames);
DeleteJob deleteJob = ConnectContext.get() != null && ConnectContext.get().isTxnModel()
? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo)
: new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo);
long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
deleteJob.setPartitions(partitions);
deleteJob.setDeleteConditions(params.getDeleteConditions());
deleteJob.setTargetDb(params.getDb());
deleteJob.setTargetTbl(params.getTable());
deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) replicaNum));
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
deleteJob.setTimeoutS(connectContext.getExecTimeoutS());
}
return deleteJob;
}
private List<Partition> getSelectedPartitions(OlapTable olapTable, Collection<String> partitionNames,
List<Predicate> deleteConditions) throws Exception {
if (partitionNames.isEmpty()) {
// Try to get selected partitions if no partition specified in delete statement
// Use PartitionPruner to generate the select partitions
if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE
|| olapTable.getPartitionInfo().getType() == PartitionType.LIST) {
Set<String> partitionColumnNameSet = olapTable.getPartitionColumnNames();
Map<String, ColumnRange> columnNameToRange = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (String colName : partitionColumnNameSet) {
ColumnRange columnRange = createColumnRange(olapTable, colName, deleteConditions);
// Not all partition columns are involved in predicate conditions
if (columnRange != null) {
columnNameToRange.put(colName, columnRange);
}
}
Collection<Long> selectedPartitionId = null;
if (!columnNameToRange.isEmpty()) {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, PartitionItem> keyItemMap = partitionInfo.getIdToItem(false);
PartitionPruner pruner = olapTable.getPartitionInfo().getType() == PartitionType.RANGE
? new RangePartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(),
columnNameToRange)
: new ListPartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(),
columnNameToRange);
selectedPartitionId = pruner.prune();
}
// selectedPartitionId is empty means no partition matches conditions.
// How to return empty set in such case?
if (selectedPartitionId != null && !selectedPartitionId.isEmpty()) {
for (long partitionId : selectedPartitionId) {
partitionNames.add(olapTable.getPartition(partitionId).getName());
}
} else {
if (!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) {
throw new UserException("This is a range or list partitioned table."
+ " You should specify partition in delete stmt,"
+ " or set delete_without_partition to true");
} else {
partitionNames.addAll(olapTable.getPartitionNames());
}
}
} else if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
// this is an un-partitioned table, use table name as partition name
partitionNames.add(olapTable.getDisplayName());
} else {
throw new UserException("Unknown partition type: " + olapTable.getPartitionInfo().getType());
}
}
List<Partition> partitions = Lists.newArrayList();
for (String partName : partitionNames) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
throw new DdlException("Partition does not exist. name: " + partName);
}
partitions.add(partition);
}
return partitions;
}
// Return null if there is no filter for the partition column
private ColumnRange createColumnRange(OlapTable table, String colName, List<Predicate> conditions)
throws AnalysisException {
ColumnRange result = ColumnRange.create();
Type type =
table.getBaseSchema().stream().filter(c -> c.getName().equalsIgnoreCase(colName))
.findFirst().get().getType();
boolean hasRange = false;
for (Predicate predicate : conditions) {
List<Range<ColumnBound>> bounds = createColumnRange(colName, predicate, type);
if (bounds != null) {
hasRange = true;
result.intersect(bounds);
}
}
if (hasRange) {
return result;
} else {
return null;
}
}
// Return null if the condition is not related to the partition column,
// or the operator is not supported.
private List<Range<ColumnBound>> createColumnRange(String colName, Predicate condition, Type type)
throws AnalysisException {
List<Range<ColumnBound>> result = Lists.newLinkedList();
if (condition instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
if (!(binaryPredicate.getChild(0) instanceof SlotRef)) {
return null;
}
String columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
if (!colName.equalsIgnoreCase(columnName)) {
return null;
}
ColumnBound bound = ColumnBound.of(
LiteralExpr.create(binaryPredicate.getChild(1).getStringValue(), type));
switch (binaryPredicate.getOp()) {
case EQ:
result.add(Range.closed(bound, bound));
break;
case GE:
result.add(Range.atLeast(bound));
break;
case GT:
result.add(Range.greaterThan(bound));
break;
case LT:
result.add(Range.lessThan(bound));
break;
case LE:
result.add(Range.atMost(bound));
break;
case NE:
result.add(Range.lessThan(bound));
result.add(Range.greaterThan(bound));
break;
default:
return null;
}
} else if (condition instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) condition;
if (!(inPredicate.getChild(0) instanceof SlotRef)) {
return null;
}
String columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
if (!colName.equals(columnName)) {
return null;
}
if (inPredicate.isNotIn()) {
return null;
}
for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
ColumnBound bound = ColumnBound.of(LiteralExpr
.create(inPredicate.getChild(i).getStringValue(), type));
result.add(Range.closed(bound, bound));
}
} else {
return null;
}
return result;
}
private List<String> getDeleteCondString(List<Predicate> conditions) {
List<String> deleteConditions = Lists.newArrayListWithCapacity(conditions.size());
// save delete conditions
for (Predicate condition : conditions) {
if (condition instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0);
String columnName = slotRef.getColumnName();
String sb = columnName + " " + binaryPredicate.getOp().name() + " \""
+ binaryPredicate.getChild(1).getStringValue() + "\"";
deleteConditions.add(sb);
} else if (condition instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0);
String columnName = slotRef.getColumnName();
StringBuilder sb = new StringBuilder();
sb.append(columnName);
if (isNullPredicate.isNotNull()) {
sb.append(" IS NOT NULL");
} else {
sb.append(" IS NULL");
}
deleteConditions.add(sb.toString());
} else if (condition instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) condition;
SlotRef slotRef = (SlotRef) inPredicate.getChild(0);
String columnName = slotRef.getColumnName();
StringBuilder strBuilder = new StringBuilder();
String notStr = inPredicate.isNotIn() ? "NOT " : "";
strBuilder.append(columnName).append(" ").append(notStr).append("IN (");
for (int i = 1; i <= inPredicate.getInElementNum(); ++i) {
strBuilder.append(inPredicate.getChild(i).toSql());
strBuilder.append((i != inPredicate.getInElementNum()) ? ", " : "");
}
strBuilder.append(")");
deleteConditions.add(strBuilder.toString());
}
}
return deleteConditions;
}
}
protected void addTableIndexes(TransactionState state) {
state.addTableIndexes(targetTbl);
}
}