PaimonUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.paimon;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.hive.HiveUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Projection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
public class PaimonUtil {
private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
public static List<InternalRow> read(
Table table, @Nullable int[] projection, @Nullable Predicate predicate,
Pair<ConfigOption<?>, String>... dynamicOptions)
throws IOException {
Map<String, String> options = new HashMap<>();
for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
options.put(pair.getKey().key(), pair.getValue());
}
if (!options.isEmpty()) {
table = table.copy(options);
}
ReadBuilder readBuilder = table.newReadBuilder();
if (projection != null) {
readBuilder.withProjection(projection);
}
if (predicate != null) {
readBuilder.withFilter(predicate);
}
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
InternalRowSerializer serializer =
new InternalRowSerializer(
projection == null
? table.rowType()
: Projection.of(projection).project(table.rowType()));
List<InternalRow> rows = new ArrayList<>();
reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
return rows;
}
public static PaimonPartitionInfo generatePartitionInfo(List<Column> partitionColumns,
List<Partition> paimonPartitions) {
if (CollectionUtils.isEmpty(partitionColumns) || paimonPartitions.isEmpty()) {
return PaimonPartitionInfo.EMPTY;
}
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
Map<String, Partition> nameToPartition = Maps.newHashMap();
PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition);
for (Partition partition : paimonPartitions) {
Map<String, String> spec = partition.spec();
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : spec.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append("/");
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
String partitionName = sb.toString();
nameToPartition.put(partitionName, partition);
try {
// partition values return by paimon api, may have problem,
// to avoid affecting the query, we catch exceptions here
nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns));
} catch (Exception e) {
LOG.warn("toListPartitionItem failed, partitionColumns: {}, partitionValues: {}",
partitionColumns, partition.spec(), e);
}
}
return partitionInfo;
}
public static ListPartitionItem toListPartitionItem(String partitionName, List<Column> partitionColumns)
throws AnalysisException {
List<Type> types = partitionColumns.stream()
.map(Column::getType)
.collect(Collectors.toList());
// Partition name will be in format: nation=cn/city=beijing
// parse it to get values "cn" and "beijing"
List<String> partitionValues = HiveUtil.toPartitionValues(partitionName);
Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types);
List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size());
for (String partitionValue : partitionValues) {
// null will in partition 'null'
// "null" will in partition 'null'
// NULL will in partition 'null'
// "NULL" will in partition 'NULL'
// values.add(new PartitionValue(partitionValue, "null".equals(partitionValue)));
values.add(new PartitionValue(partitionValue, false));
}
PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true);
ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key));
return listPartitionItem;
}
private static Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
int tsScale = 3; // default
switch (dataType.getTypeRoot()) {
case BOOLEAN:
return Type.BOOLEAN;
case INTEGER:
return Type.INT;
case BIGINT:
return Type.BIGINT;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case SMALLINT:
return Type.SMALLINT;
case TINYINT:
return Type.TINYINT;
case VARCHAR:
return ScalarType.createVarcharType(((VarCharType) dataType).getLength());
case CHAR:
return ScalarType.createCharType(((CharType) dataType).getLength());
case BINARY:
case VARBINARY:
return Type.STRING;
case DECIMAL:
DecimalType decimal = (DecimalType) dataType;
return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale());
case DATE:
return ScalarType.createDateV2Type();
case TIMESTAMP_WITHOUT_TIME_ZONE:
if (dataType instanceof org.apache.paimon.types.TimestampType) {
tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision();
if (tsScale > 6) {
tsScale = 6;
}
} else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) {
tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
if (tsScale > 6) {
tsScale = 6;
}
}
return ScalarType.createDatetimeV2Type(tsScale);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) {
tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
if (tsScale > 6) {
tsScale = 6;
}
}
return ScalarType.createDatetimeV2Type(tsScale);
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType());
return org.apache.doris.catalog.ArrayType.create(innerType, true);
case MAP:
MapType mapType = (MapType) dataType;
return new org.apache.doris.catalog.MapType(
paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType()));
case ROW:
RowType rowType = (RowType) dataType;
List<DataField> fields = rowType.getFields();
return new org.apache.doris.catalog.StructType(fields.stream()
.map(field -> new org.apache.doris.catalog.StructField(field.name(),
paimonTypeToDorisType(field.type())))
.collect(Collectors.toCollection(ArrayList::new)));
case TIME_WITHOUT_TIME_ZONE:
return Type.UNSUPPORTED;
default:
LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot());
return Type.UNSUPPORTED;
}
}
public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
return paimonPrimitiveTypeToDorisType(type);
}
}