MaxComputeScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.maxcompute.source;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
import org.apache.doris.datasource.property.constants.MCProperties;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TMaxComputeFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
import com.aliyun.odps.table.optimizer.predicate.Predicate;
import com.aliyun.odps.table.read.TableBatchReadSession;
import com.aliyun.odps.table.read.TableReadSessionBuilder;
import com.aliyun.odps.table.read.split.InputSplitAssigner;
import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
import com.google.common.collect.Maps;
import jline.internal.Log;
import lombok.Setter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class MaxComputeScanNode extends FileQueryScanNode {
static final DateTimeFormatter dateTime3Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
static final DateTimeFormatter dateTime6Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private final MaxComputeExternalTable table;
private Predicate filterPredicate;
List<String> requiredPartitionColumns = new ArrayList<>();
List<String> orderedRequiredDataColumns = new ArrayList<>();
private int connectTimeout;
private int readTimeout;
private int retryTimes;
@Setter
private SelectedPartitions selectedPartitions = null;
private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap());
private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap());
// For new planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
SelectedPartitions selectedPartitions, boolean needCheckColumnPriv,
SessionVariable sv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
selectedPartitions, needCheckColumnPriv, sv);
}
// For old planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv,
SessionVariable sv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
SelectedPartitions.NOT_PRUNED, needCheckColumnPriv, sv);
}
private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, SelectedPartitions selectedPartitions,
boolean needCheckColumnPriv, SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv);
table = (MaxComputeExternalTable) desc.getTable();
this.selectedPartitions = selectedPartitions;
}
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof MaxComputeSplit) {
setScanParams(rangeDesc, (MaxComputeSplit) split);
}
}
private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
fileDesc.setPartitionSpec("deprecated");
fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
fileDesc.setSessionId(maxComputeSplit.getSessionId());
fileDesc.setReadTimeout(readTimeout);
fileDesc.setConnectTimeout(connectTimeout);
fileDesc.setRetryTimes(retryTimes);
tableFormatFileDesc.setMaxComputeParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + maxComputeSplit.getLength() + " ]");
rangeDesc.setStartOffset(maxComputeSplit.getStart());
rangeDesc.setSize(maxComputeSplit.getLength());
}
private void createRequiredColumns() {
Set<String> requiredSlots =
desc.getSlots().stream().map(e -> e.getColumn().getName()).collect(Collectors.toSet());
Set<String> partitionColumns =
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
requiredPartitionColumns.clear();
orderedRequiredDataColumns.clear();
for (Column column : table.getColumns()) {
String columnName = column.getName();
if (!requiredSlots.contains(columnName)) {
continue;
}
if (partitionColumns.contains(columnName)) {
requiredPartitionColumns.add(columnName);
} else {
orderedRequiredDataColumns.add(columnName);
}
}
}
/**
* For no partition table: request requiredPartitionSpecs is empty
* For partition table: if requiredPartitionSpecs is empty, get all partition data.
*/
TableBatchReadSession createTableBatchReadSession(List<PartitionSpec> requiredPartitionSpecs) throws IOException {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
readTimeout = mcCatalog.getReadTimeout();
connectTimeout = mcCatalog.getConnectTimeout();
retryTimes = mcCatalog.getRetryTimes();
TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
return scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
.withSettings(mcCatalog.getSettings())
.withSplitOptions(mcCatalog.getSplitOption())
.requiredPartitionColumns(requiredPartitionColumns)
.requiredDataColumns(orderedRequiredDataColumns)
.withFilterPredicate(filterPredicate)
.requiredPartitions(requiredPartitionSpecs)
.withArrowOptions(
ArrowOptions.newBuilder()
.withDatetimeUnit(TimestampUnit.MILLI)
.withTimestampUnit(TimestampUnit.MICRO)
.build()
).buildBatchReadSession();
}
@Override
public boolean isBatchMode() {
if (table.getPartitionColumns().isEmpty()) {
return false;
}
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return false;
}
int numPartitions = sessionVariable.getNumPartitionsInBatchMode();
return numPartitions > 0
&& selectedPartitions != SelectedPartitions.NOT_PRUNED
&& selectedPartitions.selectedPartitions.size() >= numPartitions;
}
@Override
public int numApproximateSplits() {
return selectedPartitions.selectedPartitions.size();
}
@Override
public void startSplit(int numBackends) {
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
this.selectedPartitionNum = selectedPartitions.selectedPartitions.size();
if (selectedPartitions.selectedPartitions.isEmpty()) {
//no need read any partition data.
return;
}
createRequiredColumns();
List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
selectedPartitions.selectedPartitions.forEach(
(key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key))
);
int batchNumPartitions = sessionVariable.getNumPartitionsInBatchMode();
Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
AtomicReference<UserException> batchException = new AtomicReference<>(null);
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
CompletableFuture.runAsync(() -> {
for (int beginIndex = 0; beginIndex < requiredPartitionSpecs.size(); beginIndex += batchNumPartitions) {
int endIndex = Math.min(beginIndex + batchNumPartitions, requiredPartitionSpecs.size());
if (batchException.get() != null || splitAssignment.isStop()) {
break;
}
List<PartitionSpec> requiredBatchPartitionSpecs = requiredPartitionSpecs.subList(beginIndex, endIndex);
int curBatchSize = endIndex - beginIndex;
try {
CompletableFuture.runAsync(() -> {
try {
TableBatchReadSession tableBatchReadSession =
createTableBatchReadSession(requiredBatchPartitionSpecs);
List<Split> batchSplit = getSplitByTableSession(tableBatchReadSession);
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(batchSplit);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.addAndGet(curBatchSize) == requiredPartitionSpecs.size()) {
splitAssignment.finishSchedule();
}
}
}, scheduleExecutor);
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
}
}, scheduleExecutor);
}
@Override
protected void convertPredicate() {
if (conjuncts.isEmpty()) {
this.filterPredicate = Predicate.NO_PREDICATE;
}
List<Predicate> odpsPredicates = new ArrayList<>();
for (Expr dorisPredicate : conjuncts) {
try {
odpsPredicates.add(convertExprToOdpsPredicate(dorisPredicate));
} catch (AnalysisException e) {
Log.warn("Failed to convert predicate " + dorisPredicate.toString() + "Reason: "
+ e.getMessage());
}
}
if (odpsPredicates.isEmpty()) {
this.filterPredicate = Predicate.NO_PREDICATE;
} else if (odpsPredicates.size() == 1) {
this.filterPredicate = odpsPredicates.get(0);
} else {
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND);
for (Predicate odpsPredicate : odpsPredicates) {
filterPredicate.addPredicate(odpsPredicate);
}
this.filterPredicate = filterPredicate;
}
}
private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException {
Predicate odpsPredicate = null;
if (expr instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator odpsOp;
switch (compoundPredicate.getOp()) {
case AND:
odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND;
break;
case OR:
odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.OR;
break;
case NOT:
odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.NOT;
break;
default:
throw new AnalysisException("Unknown operator: " + compoundPredicate.getOp());
}
List<Predicate> odpsPredicates = new ArrayList<>();
odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(0)));
if (compoundPredicate.getOp() != Operator.NOT) {
odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(1)));
}
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(odpsOp, odpsPredicates);
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator odpsOp =
inPredicate.isNotIn()
? com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.IN
: com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.NOT_IN;
String columnName = convertSlotRefToColumnName(expr.getChild(0));
com.aliyun.odps.OdpsType odpsType = table.getColumnNameToOdpsColumn().get(columnName).getType();
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(columnName);
stringBuilder.append(" ");
stringBuilder.append(odpsOp.getDescription());
stringBuilder.append(" (");
for (int i = 1; i < inPredicate.getChildren().size(); i++) {
stringBuilder.append(convertLiteralToOdpsValues(odpsType, expr.getChild(i)));
if (i < inPredicate.getChildren().size() - 1) {
stringBuilder.append(", ");
}
}
stringBuilder.append(" )");
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
} else if (expr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator odpsOp;
switch (binaryPredicate.getOp()) {
case EQ: {
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.EQUALS;
break;
}
case NE: {
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.NOT_EQUALS;
break;
}
case GE: {
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN_OR_EQUAL;
break;
}
case LE: {
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN_OR_EQUAL;
break;
}
case LT: {
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN;
break;
}
case GT: {
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN;
break;
}
default: {
odpsOp = null;
break;
}
}
if (odpsOp != null) {
String columnName = convertSlotRefToColumnName(expr.getChild(0));
com.aliyun.odps.OdpsType odpsType = table.getColumnNameToOdpsColumn().get(columnName).getType();
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(columnName);
stringBuilder.append(" ");
stringBuilder.append(odpsOp.getDescription());
stringBuilder.append(" ");
stringBuilder.append(convertLiteralToOdpsValues(odpsType, expr.getChild(1)));
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
}
} else if (expr instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator odpsOp =
isNullPredicate.isNotNull()
? com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.NOT_NULL
: com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.IS_NULL;
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.UnaryPredicate(odpsOp,
new com.aliyun.odps.table.optimizer.predicate.Attribute(
convertSlotRefToColumnName(expr.getChild(0))
)
);
}
if (odpsPredicate == null) {
throw new AnalysisException("Do not support convert ["
+ expr.getExprName() + "] in convertExprToOdpsPredicate.");
}
return odpsPredicate;
}
private String convertSlotRefToColumnName(Expr expr) throws AnalysisException {
if (expr instanceof SlotRef) {
return ((SlotRef) expr).getColumnName();
} else if (expr instanceof CastExpr) {
if (expr.getChild(0) instanceof SlotRef) {
return ((SlotRef) expr.getChild(0)).getColumnName();
}
}
throw new AnalysisException("Do not support convert ["
+ expr.getExprName() + "] in convertSlotRefToAttribute.");
}
private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws AnalysisException {
if (!(expr instanceof LiteralExpr)) {
throw new AnalysisException("Do not support convert ["
+ expr.getExprName() + "] in convertSlotRefToAttribute.");
}
LiteralExpr literalExpr = (LiteralExpr) expr;
switch (odpsType) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case DECIMAL:
case FLOAT:
case DOUBLE: {
return " " + literalExpr.toString() + " ";
}
case STRING:
case CHAR:
case VARCHAR: {
return " \"" + literalExpr.toString() + "\" ";
}
case DATE: {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDateV2Type();
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
}
case DATETIME: {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(3);
return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), dateTime3Formatter,
ZoneId.of("UTC")) + "\" ";
}
break;
}
/**
* Disable the predicate pushdown to the odps API because the timestamp precision of odps is 9 and the
* mapping precision of Doris is 6. If we insert `2023-02-02 00:00:00.123456789` into odps, doris reads
* it as `2023-02-02 00:00:00.123456`. Since "789" is missing, we cannot push it down correctly.
*/
case TIMESTAMP: {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), dateTime6Formatter,
ZoneId.of("UTC")) + "\" ";
}
break;
}
case TIMESTAMP_NTZ: {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
}
break;
}
default: {
break;
}
}
throw new AnalysisException("Do not support convert odps type [" + odpsType + "] to odps values.");
}
public static String convertDateTimezone(String dateTimeStr, DateTimeFormatter formatter, ZoneId toZone) {
if (DateUtils.getTimeZone().equals(toZone)) {
return dateTimeStr;
}
LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter);
ZonedDateTime sourceZonedDateTime = localDateTime.atZone(DateUtils.getTimeZone());
ZonedDateTime targetZonedDateTime = sourceZonedDateTime.withZoneSameInstant(toZone);
return targetZonedDateTime.format(formatter);
}
@Override
public TFileFormatType getFileFormatType() {
return TFileFormatType.FORMAT_JNI;
}
@Override
public List<String> getPathPartitionKeys() {
return Collections.emptyList();
}
@Override
protected TableIf getTargetTable() throws UserException {
return table;
}
@Override
protected Map<String, String> getLocationProperties() throws UserException {
return new HashMap<>();
}
private List<Split> getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws IOException {
List<Split> result = new ArrayList<>();
String scanSessionSerialize = serializeSession(tableBatchReadSession);
InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner();
long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime();
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
MaxComputeSplit maxComputeSplit =
new MaxComputeSplit(BYTE_SIZE_PATH,
((IndexedInputSplit) split).getSplitIndex(), -1,
mcCatalog.getSplitByteSize(),
modificationTime, null,
Collections.emptyList());
maxComputeSplit.scanSerialize = scanSessionSerialize;
maxComputeSplit.splitType = SplitType.BYTE_SIZE;
maxComputeSplit.sessionId = split.getSessionId();
result.add(maxComputeSplit);
}
} else {
long totalRowCount = assigner.getTotalRowCount();
long recordsPerSplit = mcCatalog.getSplitRowCount();
for (long offset = 0; offset < totalRowCount; offset += recordsPerSplit) {
recordsPerSplit = Math.min(recordsPerSplit, totalRowCount - offset);
com.aliyun.odps.table.read.split.InputSplit split =
assigner.getSplitByRowOffset(offset, recordsPerSplit);
MaxComputeSplit maxComputeSplit =
new MaxComputeSplit(ROW_OFFSET_PATH,
offset, recordsPerSplit, totalRowCount, modificationTime, null,
Collections.emptyList());
maxComputeSplit.scanSerialize = scanSessionSerialize;
maxComputeSplit.splitType = SplitType.ROW_OFFSET;
maxComputeSplit.sessionId = split.getSessionId();
result.add(maxComputeSplit);
}
}
return result;
}
@Override
public List<Split> getSplits(int numBackends) throws UserException {
List<Split> result = new ArrayList<>();
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
createRequiredColumns();
List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
//if requiredPartitionSpecs is empty, get all partition data.
if (!table.getPartitionColumns().isEmpty() && selectedPartitions != SelectedPartitions.NOT_PRUNED) {
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
this.selectedPartitionNum = selectedPartitions.selectedPartitions.size();
if (selectedPartitions.selectedPartitions.isEmpty()) {
//no need read any partition data.
return result;
}
selectedPartitions.selectedPartitions.forEach(
(key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key))
);
}
try {
TableBatchReadSession tableBatchReadSession = createTableBatchReadSession(requiredPartitionSpecs);
result = getSplitByTableSession(tableBatchReadSession);
} catch (IOException e) {
throw new RuntimeException(e);
}
return result;
}
private static String serializeSession(Serializable object) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(object);
byte[] serializedBytes = byteArrayOutputStream.toByteArray();
return Base64.getEncoder().encodeToString(serializedBytes);
}
}