RowBatchBuilder.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe.cache;
import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TResultBatch;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
/**
* According to the query partition range and cache hit, the rowbatch to update the cache is constructed
*/
public class RowBatchBuilder {
private static final Logger LOG = LogManager.getLogger(RowBatchBuilder.class);
private InternalService.PUpdateCacheRequest updateRequest;
private CacheAnalyzer.CacheMode cacheMode;
private int keyIndex;
private Type keyType;
private HashMap<Long, PartitionRange.PartitionSingle> cachePartMap;
private List<byte[]> rowList;
private int batchSize;
private int rowSize;
private int dataSize;
public int getRowSize() {
return rowSize;
}
public int getDataSize() {
return dataSize;
}
public RowBatchBuilder(CacheAnalyzer.CacheMode model) {
cacheMode = model;
keyIndex = 0;
keyType = Type.INVALID;
rowList = Lists.newArrayList();
cachePartMap = new HashMap<>();
batchSize = 0;
rowSize = 0;
dataSize = 0;
}
public void buildPartitionIndex(ArrayList<Expr> resultExpr,
List<String> columnLabel, Column partColumn,
List<PartitionRange.PartitionSingle> newSingleList) {
if (cacheMode != CacheAnalyzer.CacheMode.Partition) {
return;
}
for (int i = 0; i < columnLabel.size(); i++) {
if (columnLabel.get(i).equalsIgnoreCase(partColumn.getName())) {
keyType = resultExpr.get(i).getType();
keyIndex = i;
break;
}
}
if (newSingleList != null) {
for (PartitionRange.PartitionSingle single : newSingleList) {
cachePartMap.put(single.getCacheKey().realValue(), single);
}
} else {
LOG.info("no new partition single list ");
}
}
public void copyRowData(RowBatch rowBatch) {
batchSize++;
TResultBatch resultBatch = rowBatch.getBatch();
// for empty result set, the resultBatch will be null
rowSize += resultBatch == null ? 0 : resultBatch.getRowsSize();
if (resultBatch != null) {
for (ByteBuffer buf : rowBatch.getBatch().getRows()) {
byte[] bytes = Arrays.copyOfRange(buf.array(), buf.position(), buf.limit());
dataSize += bytes.length;
rowList.add(bytes);
}
}
}
public void clear() {
rowList = Lists.newArrayList();
cachePartMap = new HashMap<>();
batchSize = 0;
rowSize = 0;
dataSize = 0;
}
public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(
PUniqueId cacheKeyMd5, long partitionKey, long lastVersion, long lastestTime, long partitionNum) {
if (updateRequest == null) {
updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
.setSqlKey(cacheKeyMd5)
.setCacheType(InternalService.CacheType.SQL_CACHE).build();
}
updateRequest = updateRequest.toBuilder()
.addValues(InternalService.PCacheValue.newBuilder()
.setParam(InternalService.PCacheParam.newBuilder()
.setPartitionKey(partitionKey)
.setLastVersion(lastVersion)
.setLastVersionTime(lastestTime)
.setPartitionNum(partitionNum)
.build()).setDataSize(dataSize).addAllRows(
rowList.stream().map(row -> ByteString.copyFrom(row))
.collect(Collectors.toList()))).build();
return updateRequest;
}
public PartitionRange.PartitionKeyType getKeyFromRow(byte[] row, int index, Type type) {
PartitionRange.PartitionKeyType key = new PartitionRange.PartitionKeyType();
ByteBuffer buf = ByteBuffer.wrap(row);
int len;
for (int i = 0; i <= index; i++) {
len = buf.get();
if (i < index) {
buf.position(buf.position() + len);
}
if (i == index) {
byte[] content = Arrays.copyOfRange(buf.array(), buf.position(), buf.position() + len);
String str = new String(content);
key.init(type, str);
}
}
return key;
}
/**
* Rowbatch split to Row
*/
public InternalService.PUpdateCacheRequest buildPartitionUpdateRequest(String sql) {
if (updateRequest == null) {
updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
.setSqlKey(CacheProxy.getMd5(sql))
.setCacheType(InternalService.CacheType.PARTITION_CACHE).build();
}
HashMap<Long, List<byte[]>> partRowMap = new HashMap<>();
List<byte[]> partitionRowList;
PartitionRange.PartitionKeyType cacheKey;
for (byte[] row : rowList) {
cacheKey = getKeyFromRow(row, keyIndex, keyType);
if (!cachePartMap.containsKey(cacheKey.realValue())) {
LOG.info("cant find partition key {}", cacheKey.realValue());
continue;
}
if (!partRowMap.containsKey(cacheKey.realValue())) {
partitionRowList = Lists.newArrayList();
partitionRowList.add(row);
partRowMap.put(cacheKey.realValue(), partitionRowList);
} else {
partRowMap.get(cacheKey.realValue()).add(row);
}
}
for (HashMap.Entry<Long, List<byte[]>> entry : partRowMap.entrySet()) {
Long key = entry.getKey();
PartitionRange.PartitionSingle partition = cachePartMap.get(key);
partitionRowList = entry.getValue();
updateRequest = updateRequest.toBuilder()
.addValues(InternalService.PCacheValue.newBuilder()
.setParam(InternalService.PCacheParam.newBuilder()
.setPartitionKey(key)
.setLastVersion(partition.getPartition().getVisibleVersion())
.setLastVersionTime(partition.getPartition().getVisibleVersionTime())
.build()).setDataSize(dataSize).addAllRows(
partitionRowList.stream().map(ByteString::copyFrom)
.collect(Collectors.toList()))).build();
}
return updateRequest;
}
}