LakeSoulScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.lakesoul.source;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TLakeSoulFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.DataFileInfo;
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.substrait.proto.Plan;
import lombok.SneakyThrows;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LakeSoulScanNode extends FileQueryScanNode {
private static final Logger LOG = LogManager.getLogger(LakeSoulScanNode.class);
protected LakeSoulExternalTable lakeSoulExternalTable;
String tableName;
String location;
String partitions;
Schema tableArrowSchema;
Schema partitionArrowSchema;
private Map<String, String> tableProperties;
String readType;
public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) {
super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv, sv);
}
@Override
protected void doInitialize() throws UserException {
super.doInitialize();
lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
location = tableInfo.getTablePath();
tableName = tableInfo.getTableName();
partitions = tableInfo.getPartitions();
readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ();
try {
tableProperties = new ObjectMapper().readValue(
tableInfo.getProperties(),
new TypeReference<Map<String, String>>() {}
);
tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
List<Field> partitionFields =
DBUtil.parseTableInfoPartitions(partitions)
.rangeKeys
.stream()
.map(tableArrowSchema::findField).collect(Collectors.toList());
partitionArrowSchema = new Schema(partitionFields);
} catch (IOException e) {
throw new UserException(e);
}
}
@Override
protected TFileFormatType getFileFormatType() throws UserException {
return TFileFormatType.FORMAT_JNI;
}
@Override
protected List<String> getPathPartitionKeys() throws UserException {
return new ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys);
}
@Override
protected TableIf getTargetTable() throws UserException {
return desc.getTable();
}
@Override
protected Map<String, String> getLocationProperties() throws UserException {
return lakeSoulExternalTable.getHadoopProperties();
}
@SneakyThrows
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}", rangeDesc);
}
if (split instanceof LakeSoulSplit) {
setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
}
}
public ExternalCatalog getCatalog() {
return lakeSoulExternalTable.getCatalog();
}
public static boolean isExistHashPartition(TableInfo tif) {
JSONObject tableProperties = JSON.parseObject(tif.getProperties());
if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
&& tableProperties.getString(LakeSoulOptions.HASH_BUCKET_NUM()).equals("-1")) {
return false;
} else {
return tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM());
}
}
private void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSplit) throws IOException {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
fileDesc.setFilePaths(lakeSoulSplit.getPaths());
fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
JSONObject options = new JSONObject();
Plan predicate = LakeSoulUtils.getPushPredicate(
conjuncts,
tableName,
tableArrowSchema,
partitionArrowSchema,
tableProperties,
readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ()));
if (predicate != null) {
options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE, SubstraitUtil.encodeBase64String(predicate));
}
Map<String, String> catalogProps = getCatalog().getProperties();
if (LOG.isDebugEnabled()) {
LOG.debug("{}", catalogProps);
}
if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) {
options.put(LakeSoulUtils.FS_S3A_ENDPOINT, catalogProps.get(S3Properties.Env.ENDPOINT));
if (!options.containsKey(OssProperties.ENDPOINT)) {
// Aliyun OSS requires virtual host style access
options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "false");
} else {
// use path style access for all other s3 compatible storage services
options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true");
}
if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) {
options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY, catalogProps.get(S3Properties.Env.ACCESS_KEY));
}
if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) {
options.put(LakeSoulUtils.FS_S3A_SECRET_KEY, catalogProps.get(S3Properties.Env.SECRET_KEY));
}
if (catalogProps.get(S3Properties.Env.REGION) != null) {
options.put(LakeSoulUtils.FS_S3A_REGION, catalogProps.get(S3Properties.Env.REGION));
}
}
fileDesc.setOptions(JSON.toJSONString(options));
fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
.entrySet().stream().map(entry ->
String.format("%s=%s", entry.getKey(), entry.getValue())).collect(Collectors.toList()));
tableFormatFileDesc.setLakesoulParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
public List<Split> getSplits(int numBackends) throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits with columnFilters={}", columnFilters);
LOG.debug("getSplits with columnNameToRange={}", columnNameToRange);
LOG.debug("getSplits with conjuncts={}", conjuncts);
}
List<PartitionInfo> allPartitionInfo = lakeSoulExternalTable.listPartitionInfo();
if (LOG.isDebugEnabled()) {
LOG.debug("allPartitionInfo={}", allPartitionInfo);
}
List<PartitionInfo> filteredPartitionInfo = allPartitionInfo;
try {
filteredPartitionInfo =
LakeSoulUtils.applyPartitionFilters(
allPartitionInfo,
tableName,
partitionArrowSchema,
columnNameToRange
);
} catch (IOException e) {
throw new UserException(e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo);
}
DataFileInfo[] dataFileInfos = DataOperation.getTableDataInfo(filteredPartitionInfo);
List<Split> splits = new ArrayList<>();
Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition = new LinkedHashMap<>();
TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo();
for (DataFileInfo fileInfo : dataFileInfos) {
if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id() != -1) {
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> new LinkedHashMap<>())
.computeIfAbsent(fileInfo.file_bucket_id(), v -> new ArrayList<>())
.add(fileInfo.path());
} else {
splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> new LinkedHashMap<>())
.computeIfAbsent(-1, v -> new ArrayList<>())
.add(fileInfo.path());
}
}
List<String> pkKeys = null;
if (!tableInfo.getPartitions().equals(";")) {
pkKeys = Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(","));
}
for (Map.Entry<String, Map<Integer, List<String>>> entry : splitByRangeAndHashPartition.entrySet()) {
String rangeKey = entry.getKey();
LinkedHashMap<String, String> rangeDesc = new LinkedHashMap<>();
if (!rangeKey.equals("-5")) {
String[] keys = rangeKey.split(",");
for (String item : keys) {
String[] kv = item.split("=");
rangeDesc.put(kv[0], kv[1]);
}
}
for (Map.Entry<Integer, List<String>> split : entry.getValue().entrySet()) {
LakeSoulSplit lakeSoulSplit = new LakeSoulSplit(
split.getValue(),
pkKeys,
rangeDesc,
tableInfo.getTableSchema(),
0, 0, 0,
new String[0], null);
lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
splits.add(lakeSoulSplit);
}
}
return splits;
}
}