PartitionCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe.cache;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InlineViewRef;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
public class PartitionCache extends Cache {
private static final Logger LOG = LogManager.getLogger(PartitionCache.class);
private SelectStmt nokeyStmt;
private SelectStmt rewriteStmt;
private CompoundPredicate partitionPredicate;
private OlapTable olapTable;
private RangePartitionInfo partitionInfo;
private Column partColumn;
private PartitionRange range;
private List<PartitionRange.PartitionSingle> newRangeList;
public SelectStmt getRewriteStmt() {
return rewriteStmt;
}
// only used for unit test
public SelectStmt getNokeyStmt() {
return nokeyStmt;
}
public String getSqlWithViewStmt() {
return nokeyStmt.toSql() + "|" + allViewExpandStmtListStr;
}
public PartitionCache(TUniqueId queryId, SelectStmt selectStmt) {
super(queryId, selectStmt);
}
public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInfo partitionInfo, Column partColumn,
CompoundPredicate partitionPredicate, String allViewExpandStmtListStr) {
this.latestTable = latestTable;
this.olapTable = (OlapTable) latestTable.table;
this.partitionInfo = partitionInfo;
this.partColumn = partColumn;
this.partitionPredicate = partitionPredicate;
this.newRangeList = Lists.newArrayList();
this.allViewExpandStmtListStr = allViewExpandStmtListStr;
}
public InternalService.PFetchCacheResult getCacheData(Status status) {
rewriteSelectStmt(null);
range = new PartitionRange(this.partitionPredicate, this.olapTable,
this.partitionInfo);
if (!range.analytics()) {
status.updateStatus(TStatusCode.INTERNAL_ERROR, "analytics range error");
return null;
}
InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder()
.setSqlKey(CacheProxy.getMd5(getSqlWithViewStmt()))
.addAllParams(range.getPartitionSingleList().stream().map(
p -> InternalService.PCacheParam.newBuilder()
.setPartitionKey(p.getCacheKey().realValue())
.setLastVersion(p.getPartition().getVisibleVersion())
.setLastVersionTime(p.getPartition().getVisibleVersionTime())
.build()).collect(Collectors.toList())
).build();
InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
for (InternalService.PCacheValue value : cacheResult.getValuesList()) {
range.setCacheFlag(value.getParam().getPartitionKey());
}
cacheResult = cacheResult.toBuilder().setAllCount(range.getPartitionSingleList().size()).build();
MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L);
}
range.setTooNewByID(latestTable.latestPartitionId);
//build rewrite sql
this.hitRange = range.buildDiskPartitionRange(newRangeList);
if (newRangeList != null && newRangeList.size() > 0) {
rewriteSelectStmt(newRangeList);
}
return cacheResult;
}
public void copyRowBatch(RowBatch rowBatch) {
if (rowBatchBuilder == null) {
rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Partition);
rowBatchBuilder.buildPartitionIndex(selectStmt.getResultExprs(), selectStmt.getColLabels(),
partColumn, range.buildUpdatePartitionRange());
}
if (!super.checkRowLimit()) {
return;
}
rowBatchBuilder.copyRowData(rowBatch);
}
public void updateCache() {
if (!super.checkRowLimit()) {
return;
}
InternalService.PUpdateCacheRequest updateRequest
= rowBatchBuilder.buildPartitionUpdateRequest(getSqlWithViewStmt());
if (updateRequest.getValuesCount() > 0) {
CacheBeProxy proxy = new CacheBeProxy();
Status status = new Status();
proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
int rowCount = 0;
int dataSize = 0;
for (InternalService.PCacheValue value : updateRequest.getValuesList()) {
rowCount += value.getRowsCount();
dataSize += value.getDataSize();
}
LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
CacheAnalyzer.CacheMode.Partition, DebugUtil.printId(queryId),
DebugUtil.printId(updateRequest.getSqlKey()),
updateRequest.getValuesCount(), rowCount, dataSize);
}
}
/**
* Set the predicate containing partition key to null
*/
public void rewriteSelectStmt(List<PartitionRange.PartitionSingle> newRangeList) {
if (newRangeList == null || newRangeList.size() == 0) {
this.nokeyStmt = (SelectStmt) this.selectStmt.clone();
rewriteSelectStmt(nokeyStmt, this.partitionPredicate, null);
} else {
this.rewriteStmt = (SelectStmt) this.selectStmt.clone();
rewriteSelectStmt(rewriteStmt, this.partitionPredicate, newRangeList);
}
}
private void rewriteSelectStmt(SelectStmt newStmt, CompoundPredicate predicate,
List<PartitionRange.PartitionSingle> newRangeList) {
newStmt.setWhereClause(
rewriteWhereClause(newStmt.getWhereClause(), predicate, newRangeList)
);
List<TableRef> tableRefs = newStmt.getTableRefs();
for (TableRef tblRef : tableRefs) {
if (tblRef instanceof InlineViewRef) {
InlineViewRef viewRef = (InlineViewRef) tblRef;
QueryStmt queryStmt = viewRef.getViewStmt();
if (queryStmt instanceof SelectStmt) {
rewriteSelectStmt((SelectStmt) queryStmt, predicate, newRangeList);
}
}
}
}
/**
* Rewrite the query scope of partition key in the where condition
* origin expr : where eventdate>="2020-01-12" and eventdate<="2020-01-15"
* rewrite expr : where eventdate>="2020-01-14" and eventdate<="2020-01-15"
*/
private Expr rewriteWhereClause(Expr expr, CompoundPredicate predicate,
List<PartitionRange.PartitionSingle> newRangeList) {
if (expr == null) {
return null;
}
if (!(expr instanceof CompoundPredicate)) {
return expr;
}
if (expr.equals(predicate)) {
if (newRangeList == null) {
return null;
} else {
getPartitionRange().rewritePredicate((CompoundPredicate) expr, newRangeList);
return expr;
}
}
for (int i = 0; i < expr.getChildren().size(); i++) {
Expr child = rewriteWhereClause(expr.getChild(i), predicate, newRangeList);
if (child == null) {
expr.removeNode(i);
i--;
} else {
expr.setChild(i, child);
}
}
if (expr.getChildren().size() == 0) {
return null;
} else if (expr.getChildren().size() == 1) {
return expr.getChild(0);
} else {
return expr;
}
}
public PartitionRange getPartitionRange() {
if (range == null) {
range = new PartitionRange(this.partitionPredicate,
this.olapTable, this.partitionInfo);
return range;
} else {
return range;
}
}
}