PaimonColumnValue.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.paimon;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;
import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
public class PaimonColumnValue implements ColumnValue {
private static final Logger LOG = LoggerFactory.getLogger(PaimonColumnValue.class);
private int idx;
private DataGetters record;
private ColumnType dorisType;
private DataType dataType;
private String timeZone;
public PaimonColumnValue() {
}
public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType, String timeZone) {
this.idx = idx;
this.record = record;
this.dorisType = columnType;
this.dataType = dataType;
this.timeZone = timeZone;
}
public void setIdx(int idx, ColumnType dorisType, DataType dataType) {
this.idx = idx;
this.dorisType = dorisType;
this.dataType = dataType;
}
public void setOffsetRow(InternalRow record) {
this.record = record;
}
public void setTimeZone(String timeZone) {
this.timeZone = timeZone;
}
@Override
public boolean canGetStringAsBytes() {
return true;
}
@Override
public boolean getBoolean() {
return record.getBoolean(idx);
}
@Override
public byte getByte() {
return record.getByte(idx);
}
@Override
public short getShort() {
return record.getShort(idx);
}
@Override
public int getInt() {
return record.getInt(idx);
}
@Override
public float getFloat() {
return record.getFloat(idx);
}
@Override
public long getLong() {
return record.getLong(idx);
}
@Override
public double getDouble() {
return record.getDouble(idx);
}
@Override
public BigInteger getBigInteger() {
return BigInteger.valueOf(record.getInt(idx));
}
@Override
public BigDecimal getDecimal() {
return record.getDecimal(idx, dorisType.getPrecision(), dorisType.getScale()).toBigDecimal();
}
@Override
public String getString() {
return record.getString(idx).toString();
}
@Override
public byte[] getStringAsBytes() {
return record.getString(idx).toBytes();
}
@Override
public LocalDate getDate() {
return LocalDate.ofEpochDay(record.getInt(idx));
}
@Override
public LocalDateTime getDateTime() {
Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision());
if (dataType instanceof LocalZonedTimestampType) {
return ts.toLocalDateTime().atZone(ZoneId.of("UTC"))
.withZoneSameInstant(ZoneId.of(timeZone)).toLocalDateTime();
} else {
return ts.toLocalDateTime();
}
}
@Override
public boolean isNull() {
return record.isNullAt(idx);
}
@Override
public byte[] getBytes() {
return record.getBinary(idx);
}
@Override
public void unpackArray(List<ColumnValue> values) {
InternalArray recordArray = record.getArray(idx);
for (int i = 0; i < recordArray.size(); i++) {
PaimonColumnValue arrayColumnValue = new PaimonColumnValue((DataGetters) recordArray, i,
dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType(), timeZone);
values.add(arrayColumnValue);
}
}
@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
InternalMap map = record.getMap(idx);
InternalArray key = map.keyArray();
for (int i = 0; i < key.size(); i++) {
PaimonColumnValue keyColumnValue = new PaimonColumnValue((DataGetters) key, i,
dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType(), timeZone);
keys.add(keyColumnValue);
}
InternalArray value = map.valueArray();
for (int i = 0; i < value.size(); i++) {
PaimonColumnValue valueColumnValue = new PaimonColumnValue((DataGetters) value, i,
dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType(), timeZone);
values.add(valueColumnValue);
}
}
@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
// todo: support pruned struct fields
InternalRow row = record.getRow(idx, structFieldIndex.size());
for (int i : structFieldIndex) {
values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i),
((RowType) dataType).getFields().get(i).type(), timeZone));
}
}
}