SqlCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe.cache;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class SqlCache extends Cache {
private static final Logger LOG = LogManager.getLogger(SqlCache.class);
private String originSql;
private PUniqueId cacheMd5;
public SqlCache(TUniqueId queryId, SelectStmt selectStmt) {
super(queryId, selectStmt);
}
// For SetOperationStmt and Nereids
public SqlCache(TUniqueId queryId, String originSql) {
super(queryId);
this.originSql = originSql;
}
public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, String allViewExpandStmtListStr) {
this.latestTable = latestTable;
this.allViewExpandStmtListStr = allViewExpandStmtListStr;
this.cacheMd5 = null;
}
public PUniqueId getOrComputeCacheMd5() {
if (cacheMd5 == null) {
cacheMd5 = CacheProxy.getMd5(getSqlWithViewStmt());
}
return cacheMd5;
}
public void setCacheMd5(PUniqueId cacheMd5) {
this.cacheMd5 = cacheMd5;
}
public String getSqlWithViewStmt() {
String originSql = selectStmt != null ? selectStmt.toSql() : this.originSql;
String cacheKey = originSql + "|" + allViewExpandStmtListStr;
if (LOG.isDebugEnabled()) {
LOG.debug("Cache key: {}", cacheKey);
}
return cacheKey;
}
public long getLatestId() {
return latestTable.latestPartitionId;
}
public long getLatestTime() {
return latestTable.latestPartitionTime;
}
public long getLatestVersion() {
return latestTable.latestPartitionVersion;
}
public long getSumOfPartitionNum() {
return latestTable.sumOfPartitionNum;
}
public static Backend findCacheBe(PUniqueId cacheMd5) {
return CacheCoordinator.getInstance().findBackend(cacheMd5);
}
public static InternalService.PFetchCacheResult getCacheData(CacheProxy proxy,
PUniqueId cacheKeyMd5, long latestPartitionId, long latestPartitionVersion,
long latestPartitionTime, long sumOfPartitionNum, Status status) {
InternalService.PFetchCacheRequest request = InternalService.PFetchCacheRequest.newBuilder()
.setSqlKey(cacheKeyMd5)
.addParams(InternalService.PCacheParam.newBuilder()
.setPartitionKey(latestPartitionId)
.setLastVersion(latestPartitionVersion)
.setLastVersionTime(latestPartitionTime)
.setPartitionNum(sumOfPartitionNum))
.build();
InternalService.PFetchCacheResult cacheResult = proxy.fetchCache(request, 10000, status);
if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
cacheResult = cacheResult.toBuilder().setAllCount(1).build();
}
return cacheResult;
}
public InternalService.PFetchCacheResult getCacheData(Status status) {
InternalService.PFetchCacheResult cacheResult = getCacheData(proxy, getOrComputeCacheMd5(),
latestTable.latestPartitionId, latestTable.latestPartitionVersion,
latestTable.latestPartitionTime, latestTable.sumOfPartitionNum, status);
if (status.ok() && cacheResult != null && cacheResult.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
hitRange = HitRange.Full;
}
return cacheResult;
}
public SelectStmt getRewriteStmt() {
return null;
}
public void copyRowBatch(RowBatch rowBatch) {
if (rowBatchBuilder == null) {
rowBatchBuilder = new RowBatchBuilder(CacheAnalyzer.CacheMode.Sql);
}
if (!super.checkRowLimit()) {
return;
}
rowBatchBuilder.copyRowData(rowBatch);
}
public void updateCache() {
if (!super.checkRowLimit()) {
return;
}
PUniqueId cacheKeyMd5 = getOrComputeCacheMd5();
InternalService.PUpdateCacheRequest updateRequest =
rowBatchBuilder.buildSqlUpdateRequest(cacheKeyMd5,
latestTable.latestPartitionId,
latestTable.latestPartitionVersion,
latestTable.latestPartitionTime,
latestTable.sumOfPartitionNum
);
if (updateRequest.getValuesCount() > 0) {
CacheBeProxy proxy = new CacheBeProxy();
Status status = new Status();
proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, status);
int rowCount = 0;
int dataSize = 0;
for (InternalService.PCacheValue value : updateRequest.getValuesList()) {
rowCount += value.getRowsCount();
dataSize += value.getDataSize();
}
LOG.info("update cache model {}, queryid {}, sqlkey {}, value count {}, row count {}, data size {}",
CacheAnalyzer.CacheMode.Sql, DebugUtil.printId(queryId),
DebugUtil.printId(updateRequest.getSqlKey()),
updateRequest.getValuesCount(), rowCount, dataSize);
}
}
}