PartitionRange.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe.cache;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.planner.PartitionColumnFilter;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Convert the range of the partition to the list
* all partition by day/week/month split to day list
*/
public class PartitionRange {
private static final Logger LOG = LogManager.getLogger(PartitionRange.class);
public class PartitionSingle {
private Partition partition;
private PartitionKey partitionKey;
private long partitionId;
private PartitionKeyType cacheKey;
private boolean fromCache;
private boolean tooNew;
public Partition getPartition() {
return partition;
}
public void setPartition(Partition partition) {
this.partition = partition;
}
public PartitionKey getPartitionKey() {
return partitionKey;
}
public void setPartitionKey(PartitionKey key) {
this.partitionKey = key;
}
public long getPartitionId() {
return partitionId;
}
public void setPartitionId(long partitionId) {
this.partitionId = partitionId;
}
public PartitionKeyType getCacheKey() {
return cacheKey;
}
public void setCacheKey(PartitionKeyType cacheKey) {
this.cacheKey.clone(cacheKey);
}
public boolean isFromCache() {
return fromCache;
}
public void setFromCache(boolean fromCache) {
this.fromCache = fromCache;
}
public boolean isTooNew() {
return tooNew;
}
public void setTooNew(boolean tooNew) {
this.tooNew = tooNew;
}
public PartitionSingle() {
this.partitionId = 0;
this.cacheKey = new PartitionKeyType();
this.fromCache = false;
this.tooNew = false;
}
public void debug() {
if (partition != null) {
LOG.info("partition id {}, cacheKey {}, version {}, time {}, fromCache {}, tooNew {} ",
partitionId, cacheKey.realValue(),
partition.getVisibleVersion(), partition.getVisibleVersionTime(),
fromCache, tooNew);
} else {
LOG.info("partition id {}, cacheKey {}, fromCache {}, tooNew {} ", partitionId,
cacheKey.realValue(), fromCache, tooNew);
}
}
}
public enum KeyType {
DEFAULT,
LONG,
DATE,
DATETIME,
TIME
}
public static class PartitionKeyType {
private DateTimeFormatter df8 = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.systemDefault());
private DateTimeFormatter df10 = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
public KeyType keyType = KeyType.DEFAULT;
public long value;
public Date date;
public boolean init(Type type, String str) {
switch (type.getPrimitiveType()) {
case DATE:
case DATEV2:
try {
date = Date.from(
LocalDate.parse(str, df10).atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
} catch (Exception e) {
LOG.warn("parse error str{}.", str);
return false;
}
keyType = KeyType.DATE;
break;
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
value = Long.parseLong(str);
keyType = KeyType.LONG;
break;
default:
LOG.info("PartitionCache not support such key type {}", type.toSql());
return false;
}
return true;
}
public boolean init(Type type, LiteralExpr expr) {
switch (type.getPrimitiveType()) {
case BOOLEAN:
case TIME:
case TIMEV2:
case DATETIME:
case DATETIMEV2:
case FLOAT:
case DOUBLE:
case DECIMALV2:
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
case DECIMAL256:
case CHAR:
case VARCHAR:
case STRING:
case LARGEINT:
LOG.info("PartitionCache not support such key type {}", type.toSql());
return false;
case DATE:
case DATEV2:
date = getDateValue(expr);
keyType = KeyType.DATE;
break;
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
value = expr.getLongValue();
keyType = KeyType.LONG;
break;
default:
return true;
}
return true;
}
public void clone(PartitionKeyType key) {
keyType = key.keyType;
value = key.value;
date = key.date;
}
public void add(int num) {
if (keyType == KeyType.DATE) {
date = new Date(date.getTime() + num * 3600 * 24 * 1000);
} else {
value += num;
}
}
public String toString() {
if (keyType == KeyType.DEFAULT) {
return "";
} else if (keyType == KeyType.DATE) {
return df10.format(LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()));
} else {
return String.valueOf(value);
}
}
public long realValue() {
if (keyType == KeyType.DATE) {
return Long.parseLong(df8.format(LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault())));
} else {
return value;
}
}
private Date getDateValue(LiteralExpr expr) {
Preconditions.checkArgument(expr.getType() == Type.DATE || expr.getType() == Type.DATEV2);
value = expr.getLongValue();
Date dt = null;
try {
dt = Date.from(LocalDate.parse(String.valueOf(value), df8).atStartOfDay().atZone(ZoneId.systemDefault())
.toInstant());
} catch (Exception e) {
// CHECKSTYLE IGNORE THIS LINE
}
return dt;
}
}
private CompoundPredicate partitionKeyPredicate;
private OlapTable olapTable;
private RangePartitionInfo rangePartitionInfo;
private Column partitionColumn;
private List<PartitionSingle> partitionSingleList;
public CompoundPredicate getPartitionKeyPredicate() {
return partitionKeyPredicate;
}
public void setPartitionKeyPredicate(CompoundPredicate partitionKeyPredicate) {
this.partitionKeyPredicate = partitionKeyPredicate;
}
public RangePartitionInfo getRangePartitionInfo() {
return rangePartitionInfo;
}
public void setRangePartitionInfo(RangePartitionInfo rangePartitionInfo) {
this.rangePartitionInfo = rangePartitionInfo;
}
public Column getPartitionColumn() {
return partitionColumn;
}
public void setPartitionColumn(Column partitionColumn) {
this.partitionColumn = partitionColumn;
}
public List<PartitionSingle> getPartitionSingleList() {
return partitionSingleList;
}
public PartitionRange() {
}
public PartitionRange(CompoundPredicate partitionKeyPredicate, OlapTable olapTable,
RangePartitionInfo rangePartitionInfo) {
this.partitionKeyPredicate = partitionKeyPredicate;
this.olapTable = olapTable;
this.rangePartitionInfo = rangePartitionInfo;
this.partitionSingleList = Lists.newArrayList();
}
/**
* analytics PartitionKey and PartitionInfo
*
* @return
*/
public boolean analytics() {
if (rangePartitionInfo.getPartitionColumns().size() != 1) {
return false;
}
partitionColumn = rangePartitionInfo.getPartitionColumns().get(0);
PartitionColumnFilter filter = createPartitionFilter(this.partitionKeyPredicate, partitionColumn);
try {
if (!buildPartitionKeyRange(filter, partitionColumn)) {
return false;
}
getTablePartitionList(olapTable);
} catch (AnalysisException e) {
LOG.warn("get partition range failed, because:", e);
return false;
}
return true;
}
public boolean setCacheFlag(long cacheKey) {
boolean find = false;
for (PartitionSingle single : partitionSingleList) {
if (single.getCacheKey().realValue() == cacheKey) {
single.setFromCache(true);
find = true;
break;
}
}
return find;
}
public boolean setTooNewByID(long partitionId) {
boolean find = false;
for (PartitionSingle single : partitionSingleList) {
if (single.getPartition().getId() == partitionId) {
single.setTooNew(true);
find = true;
break;
}
}
return find;
}
public boolean setTooNewByKey(long cacheKey) {
boolean find = false;
for (PartitionSingle single : partitionSingleList) {
if (single.getCacheKey().realValue() == cacheKey) {
single.setTooNew(true);
find = true;
break;
}
}
return find;
}
/**
* Only the range query of the key of the partition is supported, and the separated partition key query is not
* supported.
* Because a query can only be divided into two parts, part1 get data from cache, part2 fetch_data by scan node
* from BE.
* Partition cache : 20191211-20191215
* Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], [20191212 - 20191216],[20191210 - 20191215]
* Miss cache parameter: [20191210 - 20191216]
* So hit range is full, left or right, not support middle now
*/
public Cache.HitRange buildDiskPartitionRange(List<PartitionSingle> rangeList) {
Cache.HitRange hitRange = Cache.HitRange.None;
if (partitionSingleList.size() == 0) {
return hitRange;
}
int begin = partitionSingleList.size() - 1;
int end = 0;
for (int i = 0; i < partitionSingleList.size(); i++) {
if (!partitionSingleList.get(i).isFromCache()) {
if (begin > i) {
begin = i;
}
if (end < i) {
end = i;
}
}
}
if (end < begin) {
hitRange = Cache.HitRange.Full;
return hitRange;
} else if (begin > 0 && end == partitionSingleList.size() - 1) {
hitRange = Cache.HitRange.Left;
} else if (begin == 0 && end < partitionSingleList.size() - 1) {
hitRange = Cache.HitRange.Right;
} else if (begin > 0 && end < partitionSingleList.size() - 1) {
hitRange = Cache.HitRange.Middle;
} else {
hitRange = Cache.HitRange.None;
}
rangeList.add(partitionSingleList.get(begin));
rangeList.add(partitionSingleList.get(end));
LOG.info("the new range for scan be is [{},{}], hit range", rangeList.get(0).getCacheKey().realValue(),
rangeList.get(1).getCacheKey().realValue(), hitRange);
return hitRange;
}
/**
* Gets the partition range that needs to be updated
* @return
*/
public List<PartitionSingle> buildUpdatePartitionRange() {
List<PartitionSingle> updateList = Lists.newArrayList();
for (PartitionSingle single : partitionSingleList) {
if (!single.isFromCache() && !single.isTooNew()) {
updateList.add(single);
}
}
return updateList;
}
public boolean rewritePredicate(CompoundPredicate predicate, List<PartitionSingle> rangeList) {
if (predicate.getOp() != CompoundPredicate.Operator.AND) {
if (LOG.isDebugEnabled()) {
LOG.debug("predicate op {}", predicate.getOp().toString());
}
return false;
}
for (Expr expr : predicate.getChildren()) {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binPredicate = (BinaryPredicate) expr;
BinaryPredicate.Operator op = binPredicate.getOp();
if (binPredicate.getChildren().size() != 2) {
LOG.info("binary predicate children size {}", binPredicate.getChildren().size());
continue;
}
if (op == BinaryPredicate.Operator.NE) {
LOG.info("binary predicate op {}", op.toString());
continue;
}
PartitionKeyType key = new PartitionKeyType();
switch (op) {
case LE: //<=
key.clone(rangeList.get(1).getCacheKey());
break;
case LT: //<
key.clone(rangeList.get(1).getCacheKey());
key.add(1);
break;
case GE: //>=
key.clone(rangeList.get(0).getCacheKey());
break;
case GT: //>
key.clone(rangeList.get(0).getCacheKey());
key.add(-1);
break;
default:
break;
}
LiteralExpr newLiteral;
if (key.keyType == KeyType.DATE) {
try {
newLiteral = new DateLiteral(key.toString(), Type.DATE);
} catch (Exception e) {
LOG.warn("Date's format is error {},{}", key.toString(), e);
continue;
}
} else if (key.keyType == KeyType.LONG) {
newLiteral = new IntLiteral(key.realValue());
} else {
LOG.warn("Partition cache not support type {}", key.keyType);
continue;
}
if (binPredicate.getChild(1) instanceof LiteralExpr) {
binPredicate.removeNode(1);
binPredicate.addChild(newLiteral);
} else if (binPredicate.getChild(0) instanceof LiteralExpr) {
binPredicate.removeNode(0);
binPredicate.setChild(0, newLiteral);
} else {
continue;
}
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
continue;
}
}
}
return true;
}
/**
* Get partition info from SQL Predicate and OlapTable
* Pair<PartitionID, PartitionKey>
* PARTITION BY RANGE(`olap_date`)
* ( PARTITION p20200101 VALUES [("20200101"), ("20200102")),
* PARTITION p20200102 VALUES [("20200102"), ("20200103")) )
*/
private void getTablePartitionList(OlapTable table) {
Map<Long, PartitionItem> range = rangePartitionInfo.getIdToItem(false);
for (Map.Entry<Long, PartitionItem> entry : range.entrySet()) {
Long partId = entry.getKey();
for (PartitionSingle single : partitionSingleList) {
if (((RangePartitionItem) entry.getValue()).getItems().contains(single.getPartitionKey())) {
if (single.getPartitionId() == 0) {
single.setPartitionId(partId);
}
}
}
}
for (PartitionSingle single : partitionSingleList) {
single.setPartition(table.getPartition(single.getPartitionId()));
}
// filter the partitions in predicate but not in OlapTable
partitionSingleList =
partitionSingleList.stream().filter(p -> p.getPartition() != null).collect(Collectors.toList());
}
/**
* Get value range of partition column from predicate
*/
private boolean buildPartitionKeyRange(PartitionColumnFilter partitionColumnFilter,
Column partitionColumn) throws AnalysisException {
if (partitionColumnFilter.lowerBound == null || partitionColumnFilter.upperBound == null) {
LOG.info("filter is null");
return false;
}
PartitionKeyType begin = new PartitionKeyType();
PartitionKeyType end = new PartitionKeyType();
begin.init(partitionColumn.getType(), partitionColumnFilter.lowerBound);
end.init(partitionColumn.getType(), partitionColumnFilter.upperBound);
if (!partitionColumnFilter.lowerBoundInclusive) {
begin.add(1);
}
if (!partitionColumnFilter.upperBoundInclusive) {
end.add(-1);
}
if (begin.realValue() > end.realValue()) {
LOG.info("partition range begin {}, end {}", begin, end);
return false;
}
if (end.realValue() - begin.realValue() > Config.cache_result_max_row_count) {
LOG.info("partition key range is too large, begin {}, end {}", begin.realValue(), end.realValue());
return false;
}
while (begin.realValue() <= end.realValue()) {
PartitionKey key = PartitionKey.createPartitionKey(
Lists.newArrayList(new PartitionValue(begin.toString())),
Lists.newArrayList(partitionColumn));
PartitionSingle single = new PartitionSingle();
single.setCacheKey(begin);
single.setPartitionKey(key);
partitionSingleList.add(single);
begin.add(1);
}
return true;
}
private PartitionColumnFilter createPartitionFilter(CompoundPredicate partitionKeyPredicate,
Column partitionColumn) {
if (partitionKeyPredicate.getOp() != CompoundPredicate.Operator.AND) {
if (LOG.isDebugEnabled()) {
LOG.debug("not and op");
}
return null;
}
PartitionColumnFilter partitionColumnFilter = new PartitionColumnFilter();
for (Expr expr : partitionKeyPredicate.getChildren()) {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binPredicate = (BinaryPredicate) expr;
BinaryPredicate.Operator op = binPredicate.getOp();
if (binPredicate.getChildren().size() != 2) {
LOG.warn("child size {}", binPredicate.getChildren().size());
continue;
}
if (binPredicate.getOp() == BinaryPredicate.Operator.NE) {
if (LOG.isDebugEnabled()) {
LOG.debug("not support NE operator");
}
continue;
}
Expr slotBinding;
if (binPredicate.getChild(1) instanceof LiteralExpr) {
slotBinding = binPredicate.getChild(1);
} else if (binPredicate.getChild(0) instanceof LiteralExpr) {
slotBinding = binPredicate.getChild(0);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("not find LiteralExpr");
}
continue;
}
LiteralExpr literal = (LiteralExpr) slotBinding;
switch (op) {
case EQ: //=
partitionColumnFilter.setLowerBound(literal, true);
partitionColumnFilter.setUpperBound(literal, true);
break;
case LE: //<=
partitionColumnFilter.setUpperBound(literal, true);
break;
case LT: //<
partitionColumnFilter.setUpperBound(literal, false);
break;
case GE: //>=
partitionColumnFilter.setLowerBound(literal, true);
break;
case GT: //>
partitionColumnFilter.setLowerBound(literal, false);
break;
default:
break;
}
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
continue;
}
partitionColumnFilter.setInPredicate(inPredicate);
}
}
return partitionColumnFilter;
}
}