MaxComputeColumnValue.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.maxcompute;

import org.apache.doris.common.jni.vec.ColumnValue;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.NullableTimeStampMicroHolder;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.log4j.Logger;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;

/**
 * MaxCompute Column value in vector column
 */
public class MaxComputeColumnValue implements ColumnValue {
    private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class);
    private int idx;
    private ValueVector column;
    private ZoneId timeZone;

    public MaxComputeColumnValue() {
        idx = 0;
    }

    public void setColumnIdx(int idx) {
        this.idx = idx;
    }

    public MaxComputeColumnValue(ValueVector valueVector, int i) {
        this.column = valueVector;
        this.idx = i;
    }

    public MaxComputeColumnValue(ValueVector valueVector, int i, ZoneId timeZone) {
        this.column = valueVector;
        this.idx = i;
        this.timeZone = timeZone;
    }

    public void reset(ValueVector column) {
        this.column = column;
        this.idx = 0;
    }

    public void setTimeZone(ZoneId timeZone) {
        this.timeZone = timeZone;
    }

    @Override
    public boolean canGetStringAsBytes() {
        return true;
    }

    @Override
    public boolean isNull() {
        return column.isNull(idx);
    }

    @Override
    public boolean getBoolean() {
        BitVector bitCol = (BitVector) column;
        return bitCol.get(idx) != 0;
    }

    @Override
    public byte getByte() {
        TinyIntVector tinyIntCol = (TinyIntVector) column;
        return tinyIntCol.get(idx);
    }

    @Override
    public short getShort() {
        SmallIntVector smallIntCol = (SmallIntVector) column;
        return smallIntCol.get(idx);
    }

    @Override
    public int getInt() {
        IntVector intCol = (IntVector) column;
        return intCol.get(idx);
    }

    @Override
    public float getFloat() {
        Float4Vector floatCol = (Float4Vector) column;
        return floatCol.get(idx);
    }

    @Override
    public long getLong() {
        BigIntVector longCol = (BigIntVector) column;
        return longCol.get(idx);
    }

    @Override
    public double getDouble() {
        Float8Vector doubleCol = (Float8Vector) column;
        return doubleCol.get(idx);
    }

    @Override
    public BigInteger getBigInteger() {
        BigIntVector longCol = (BigIntVector) column;
        return BigInteger.valueOf(longCol.get(idx));
    }

    @Override
    public BigDecimal getDecimal() {
        DecimalVector decimalCol = (DecimalVector) column;
        return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx,
                    decimalCol.getScale(), DecimalVector.TYPE_WIDTH);
    }

    /**
     * copy from arrow vector DecimalUtility.getBigDecimalFromArrowBuf
     * @param byteBuf byteBuf
     * @param index index
     * @param scale scale
     * @param byteWidth DecimalVector TYPE_WIDTH
     * @return java BigDecimal
     */
    public static BigDecimal getBigDecimalFromArrowBuf(ArrowBuf byteBuf, int index, int scale, int byteWidth) {
        byte[] value = new byte[byteWidth];
        byte temp;
        final long startIndex = (long) index * byteWidth;

        byteBuf.getBytes(startIndex, value, 0, byteWidth);
        if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
            // Decimal stored as native endian, need to swap bytes to make BigDecimal if native endian is LE
            int stop = byteWidth / 2;
            for (int i = 0, j; i < stop; i++) {
                temp = value[i];
                j = (byteWidth - 1) - i;
                value[i] = value[j];
                value[j] = temp;
            }
        }
        BigInteger unscaledValue = new BigInteger(value);
        return new BigDecimal(unscaledValue, scale);
    }

    @Override
    public String getString() {
        VarCharVector varcharCol = (VarCharVector) column;
        String v = varcharCol.getObject(idx).toString();
        return v == null ? new String(new byte[0]) : v;
    }



    public String getChar() {
        VarCharVector varcharCol = (VarCharVector) column;
        return varcharCol.getObject(idx).toString().stripTrailing();
    }

    // Maybe I can use `appendBytesAndOffset(byte[] src, int offset, int length)` to reduce the creation of byte[].
    // But I haven't figured out how to write it elegantly.
    public byte[] getCharAsBytes() {
        VarCharVector varcharCol = (VarCharVector) column;
        byte[] v = varcharCol.getObject(idx).getBytes();

        if (v == null) {
            return new byte[0];
        }

        int end = v.length - 1;
        while (end >= 0 && v[end] == ' ') {
            end--;
        }
        return (end == -1) ? new byte[0] : Arrays.copyOfRange(v, 0, end + 1);
    }


    @Override
    public byte[] getStringAsBytes() {
        VarCharVector varcharCol = (VarCharVector) column;
        byte[] v = varcharCol.getObject(idx).getBytes();
        return v == null ? new byte[0] : v;
    }

    @Override
    public LocalDate getDate() {
        DateDayVector dateCol = (DateDayVector) column;
        Integer intVal = dateCol.getObject(idx);
        return LocalDate.ofEpochDay(intVal == null ? 0 : intVal);
    }

    @Override
    public LocalDateTime getDateTime() {
        LocalDateTime result;

        ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType();
        if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) { //DATETIME
            result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
        } else if (timestampType.getTimezone() == null) { // TIMESTAMP_NTZ
            NullableTimeStampMicroHolder valueHoder = new NullableTimeStampMicroHolder();
            ((TimeStampMicroVector) column).get(idx, valueHoder);
            result = microsToInstant(valueHoder.value).atZone(java.time.ZoneOffset.UTC).toLocalDateTime();
        } else { // TIMESTAMP
            result = convertToLocalDateTime((TimeStampMicroTZVector) column, idx);
        }
        return result;
    }

    @Override
    public byte[] getBytes() {
        VarBinaryVector binaryCol = (VarBinaryVector) column;
        byte[] v = binaryCol.getObject(idx);
        return v == null ? new byte[0] : v;
    }

    @Override
    public void unpackArray(List<ColumnValue> values) {
        ListVector listCol = (ListVector) column;
        int elemSize = listCol.getElementEndIndex(idx) - listCol.getElementStartIndex(idx);
        int offset = listCol.getElementStartIndex(idx);
        for (int i = 0; i < elemSize; i++) {
            MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset, timeZone);
            values.add(val);
            offset++;
        }
    }

    @Override
    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
        MapVector mapCol = (MapVector) column;
        int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx);
        int offset = mapCol.getElementStartIndex(idx);
        List<FieldVector> innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields();
        FieldVector keyList = innerCols.get(0);
        FieldVector valList = innerCols.get(1);
        for (int i = 0; i < elemSize; i++) {
            MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset, timeZone);
            keys.add(key);
            MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset, timeZone);
            values.add(val);
            offset++;
        }
    }

    @Override
    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
        StructVector structCol = (StructVector) column;
        List<FieldVector> innerCols = structCol.getChildrenFromFields();
        for (Integer fieldIndex : structFieldIndex) {
            MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx, timeZone);
            values.add(val);
        }
    }

    public LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
        long timestampMillis = milliTZVector.get(index);
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone);
    }

    public LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector nanoTZVector, int index) {
        long timestampMicro = nanoTZVector.get(index);
        return microsToInstant(timestampMicro).atZone(timeZone).toLocalDateTime();
    }

    private static Instant microsToInstant(long timestampMicro) {
        long epochSecond = Math.floorDiv(timestampMicro, 1_000_000);
        long microAdjustment = timestampMicro - epochSecond * 1_000_000;
        return Instant.ofEpochSecond(epochSecond, microAdjustment * 1000);
    }
}