PaimonJniWriter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.paimon;

import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.holders.NullableTimeStampMicroHolder;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.io.DataOutputSerializer;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarBinaryType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class PaimonJniWriter {
    private static final Logger LOG = LoggerFactory.getLogger(PaimonJniWriter.class);

    private BatchTableWrite writer;
    private final BufferAllocator allocator;
    private final CommitMessageSerializer serializer = new CommitMessageSerializer();
    private String tableLocation;
    private static final String PAIMON_OPTION_PREFIX = "paimon.";
    private static final String HADOOP_OPTION_PREFIX = "hadoop.";

    private PreExecutionAuthenticator preExecutionAuthenticator;
    private final ClassLoader classLoader;

    private Map<String, String> paimonOptionParams;
    private Map<String, String> hadoopOptionParams;

    private List<DataField> paimonFields;
    private Map<String, DataField> paimonFieldMap;
    private DataType[] targetTypes;

    private IOManager ioManager;
    private HeapMemorySegmentPool memorySegmentPool;
    private boolean isDynamicBucketMode;

    public PaimonJniWriter() {
        this.allocator = new RootAllocator(Long.MAX_VALUE);
        this.classLoader = this.getClass().getClassLoader();
        try {
            setWarnLevel("org.apache.paimon.shade.org.apache.parquet");
            setWarnLevel("org.apache.paimon");
        } catch (Throwable t) {
            LOG.warn(t.getMessage(), t);
        }
    }

    private void setWarnLevel(String loggerName) {
        org.slf4j.Logger targetLogger = org.slf4j.LoggerFactory.getLogger(loggerName);
        try {
            Class<?> logbackLoggerClass = Class.forName("ch.qos.logback.classic.Logger");
            Class<?> levelClass = Class.forName("ch.qos.logback.classic.Level");
            if (!logbackLoggerClass.isInstance(targetLogger)) {
                return;
            }
            Object warnLevel = levelClass.getField("WARN").get(null);
            logbackLoggerClass.getMethod("setLevel", levelClass).invoke(targetLogger, warnLevel);
        } catch (Throwable t) {
            LOG.debug("set logger level skipped for {}", loggerName, t);
        }
    }

    /**
     * Opens the table and initializes the Writer
     */
    public void open(String tableLocation, Map<String, String> options, String[] columnNames) throws Exception {
        this.tableLocation = tableLocation;
        Thread.currentThread().setContextClassLoader(classLoader);
        paimonOptionParams = options.entrySet().stream()
                .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
                .collect(Collectors
                        .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()),
                                kv1 -> kv1.getValue()));
        hadoopOptionParams = options.entrySet().stream()
                .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
                .collect(Collectors
                        .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
                                kv1 -> kv1.getValue()));
        if (!paimonOptionParams.containsKey("warehouse") && tableLocation != null) {
            paimonOptionParams.put("warehouse", tableLocation);
        }
        this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(options);
        preExecutionAuthenticator.execute(() -> {
            try {
                LOG.info("paimon: opening writer, location={}, opts_size={}", tableLocation, options.size());
                Table table;
                if (options.containsKey("serialized_table")) {
                    table = PaimonUtils.deserialize(options.get("serialized_table"));
                } else {
                    Catalog catalog = createCatalog(paimonOptionParams, hadoopOptionParams);
                    String dbName = options.getOrDefault("db_name", "default");
                    String tblName = options.getOrDefault("table_name", "paimon_table");
                    table = catalog.getTable(Identifier.create(dbName, tblName));
                }
                Map<String, String> dynamicOptions = new HashMap<>();
                if (options.containsKey("write-buffer-size")) {
                    dynamicOptions.put("write-buffer-size", options.get("write-buffer-size"));
                }
                copyIfPresent(options, dynamicOptions, "write-buffer-spillable");
                copyIfPresent(options, dynamicOptions, "write-buffer-spill.max-disk-size");
                copyIfPresent(options, dynamicOptions, "sort-spill-buffer-size");
                copyIfPresent(options, dynamicOptions, "sort-spill-threshold");
                copyIfPresent(options, dynamicOptions, "spill-compression");
                boolean enableJniCompact = Boolean.parseBoolean(
                        options.getOrDefault("paimon_use_jni_compact", "false"));
                boolean enableInlineCompact = enableJniCompact;
                if (enableJniCompact && table instanceof org.apache.paimon.table.FileStoreTable) {
                    int tableBuckets = ((org.apache.paimon.table.FileStoreTable) table).schema().numBuckets();
                    enableInlineCompact = tableBuckets > 0;
                }
                if (enableInlineCompact) {
                    LOG.info("paimon: enabling inline compaction for JNI writer");
                    dynamicOptions.put("write-only", "false");
                    dynamicOptions.put("num-sorted-run.compaction-trigger", "10");
                    dynamicOptions.put("compaction.max.worker-num", "8");
                } else {
                    LOG.info("paimon: write-only mode, compaction disabled");
                    dynamicOptions.put("write-only", "true");
                }
                table = table.copy(dynamicOptions);
                LOG.info("paimon: applied dynamic options to table: {}", dynamicOptions);
                this.paimonFields = table.rowType().getFields();
                this.paimonFieldMap = new HashMap<>();
                for (DataField f : this.paimonFields) {
                    this.paimonFieldMap.put(f.name(), f);
                }
                this.targetTypes = buildTargetTypes(columnNames);
                this.writer = table.newBatchWriteBuilder().newWrite();
                if (table instanceof org.apache.paimon.table.FileStoreTable) {
                    this.isDynamicBucketMode =
                            ((org.apache.paimon.table.FileStoreTable) table).schema().numBuckets() == -1;
                }
                boolean spillEnabled = Boolean.parseBoolean(options.getOrDefault("write-buffer-spillable", "false"));
                if (spillEnabled) {
                    String spillDir = options.get("paimon_jni_spill_dir");
                    if (spillDir != null && !spillDir.isEmpty()) {
                        File spillFile = new File(spillDir);
                        if (!spillFile.exists()) {
                            spillFile.mkdirs();
                        }
                        this.ioManager = new IOManagerImpl(spillDir);
                    } else {
                        this.ioManager = new IOManagerImpl(System.getProperty("java.io.tmpdir"));
                    }
                    long globalPoolSize = Long.parseLong(options.getOrDefault(
                            "paimon_global_memory_pool_size", "1073741824"));
                    this.memorySegmentPool = new HeapMemorySegmentPool(globalPoolSize, 32 * 1024);
                    this.writer.withIOManager(ioManager).withMemoryPool(memorySegmentPool);
                    LOG.info("paimon: spill enabled, spill_dir={}", spillDir);
                }
                return null;
            } catch (Throwable t) {
                throw contextException("open", "options_size=" + options.size(), t);
            }
        });
    }

    private void copyIfPresent(Map<String, String> options, Map<String, String> dynamicOptions, String key) {
        String value = options.get(key);
        if (value != null) {
            dynamicOptions.put(key, value);
        }
    }

    /**
     * Receives Arrow IPC memory address from C++, deserializes and writes to Paimon
     */
    public void write(long address, int length) throws Exception {
        preExecutionAuthenticator.execute(() -> {
            try {
                ByteBuffer directBuffer = getDirectBuffer(address, length);
                try (ArrowStreamReader reader = new ArrowStreamReader(
                        new DirectBufInputStream(directBuffer), allocator)) {
                    VectorSchemaRoot root = reader.getVectorSchemaRoot();
                    while (reader.loadNextBatch()) {
                        writeBatch(root);
                    }
                }
                return null;
            } catch (Throwable t) {
                throw contextException("write", "address=" + address + ", length=" + length, t);
            }
        });
    }

    private void writeBatch(VectorSchemaRoot root) throws Exception {
        int rowCount = root.getRowCount();
        if (rowCount == 0) {
            return;
        }
        List<Field> fields = root.getSchema().getFields();
        List<FieldVector> vectors = root.getFieldVectors();
        int colCount = fields.size();
        DataType[] currentTargetTypes = resolveTargetTypes(fields);
        GenericRow reusedRow = new GenericRow(colCount);
        for (int i = 0; i < rowCount; i++) {
            for (int col = 0; col < colCount; col++) {
                Field arrowField = fields.get(col);
                try {
                    reusedRow.setField(col, readArrowValue(vectors.get(col), i, arrowField, currentTargetTypes[col]));
                } catch (Throwable t) {
                    String fieldName = arrowField == null ? "null" : arrowField.getName();
                    String currentTargetType = currentTargetTypes[col] == null
                            ? "null"
                            : currentTargetTypes[col].asSQLString();
                    throw contextException("writeBatch.convert",
                            "row=" + i + ", col=" + col + ", field=" + fieldName + ", targetType="
                                    + currentTargetType + ", rowCount=" + rowCount + ", colCount=" + colCount,
                            t);
                }
            }
            try {
                if (isDynamicBucketMode) {
                    writer.write(reusedRow, 0);
                } else {
                    writer.write(reusedRow);
                }
            } catch (Throwable t) {
                throw contextException("writeBatch.write",
                        "row=" + i + ", rowCount=" + rowCount + ", colCount=" + colCount,
                        t);
            }
        }
    }

    private Object readArrowValue(FieldVector vector, int row, Field arrowField, DataType targetType) {
        if (vector == null || vector.isNull(row)) {
            return null;
        }
        if (targetType instanceof BinaryType || targetType instanceof VarBinaryType) {
            if (vector instanceof VarBinaryVector) {
                return ((VarBinaryVector) vector).get(row);
            }
            if (vector instanceof VarCharVector) {
                return ((VarCharVector) vector).get(row);
            }
        } else {
            if (vector instanceof VarCharVector) {
                return BinaryString.fromBytes(((VarCharVector) vector).get(row));
            }
        }
        if (vector instanceof BitVector) {
            return ((BitVector) vector).get(row) == 1;
        }
        if (vector instanceof TinyIntVector) {
            return ((TinyIntVector) vector).get(row);
        }
        if (vector instanceof SmallIntVector) {
            return ((SmallIntVector) vector).get(row);
        }
        if (vector instanceof IntVector) {
            return ((IntVector) vector).get(row);
        }
        if (vector instanceof BigIntVector) {
            return ((BigIntVector) vector).get(row);
        }
        if (vector instanceof Float4Vector) {
            return ((Float4Vector) vector).get(row);
        }
        if (vector instanceof Float8Vector) {
            return ((Float8Vector) vector).get(row);
        }
        if (vector instanceof DateDayVector) {
            return ((DateDayVector) vector).get(row);
        }
        if (vector instanceof TimeStampMicroVector) {
            NullableTimeStampMicroHolder holder = new NullableTimeStampMicroHolder();
            ((TimeStampMicroVector) vector).get(row, holder);
            return Timestamp.fromMicros(holder.value);
        }
        if (vector instanceof DecimalVector) {
            return readDecimal((DecimalVector) vector, row);
        }
        Object val = vector.getObject(row);
        return convertToPaimonType(val, arrowField, targetType);
    }

    private Decimal readDecimal(DecimalVector vector, int row) {
        BigDecimal bd = getBigDecimalFromArrowBuf(vector.getDataBuffer(), row,
                vector.getScale(), DecimalVector.TYPE_WIDTH);
        return Decimal.fromBigDecimal(bd, vector.getPrecision(), vector.getScale());
    }

    private Object convertToPaimonType(Object val, Field arrowField, DataType targetType) {
        if (val == null) {
            return null;
        }

        if (targetType instanceof BinaryType || targetType instanceof VarBinaryType) {
            if (val instanceof byte[]) {
                return val;
            } else if (val instanceof BinaryString) {
                return ((BinaryString) val).toBytes();
            } else if (val instanceof org.apache.arrow.vector.util.Text) {
                return ((org.apache.arrow.vector.util.Text) val).copyBytes();
            } else if (val instanceof org.apache.hadoop.io.Text) {
                org.apache.hadoop.io.Text t = (org.apache.hadoop.io.Text) val;
                byte[] bytes = new byte[t.getLength()];
                System.arraycopy(t.getBytes(), 0, bytes, 0, t.getLength());
                return bytes;
            } else if (val instanceof String) {
                return ((String) val).getBytes(java.nio.charset.StandardCharsets.UTF_8);
            } else {
                return val.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
            }
        }

        if (val instanceof BinaryString) {
            return val;
        }
        if (val instanceof byte[]) {
            return BinaryString.fromBytes((byte[]) val);
        }
        if (val instanceof org.apache.arrow.vector.util.Text) {
            return BinaryString.fromBytes(((org.apache.arrow.vector.util.Text) val).copyBytes());
        }
        if (val instanceof org.apache.hadoop.io.Text) {
            org.apache.hadoop.io.Text t = (org.apache.hadoop.io.Text) val;
            return BinaryString.fromBytes(t.getBytes(), 0, t.getLength());
        }
        if (val instanceof CharSequence) {
            return BinaryString.fromString(val.toString());
        }


        ArrowType arrowType = arrowField != null ? arrowField.getType() : null;
        ArrowType.ArrowTypeID typeID = arrowType != null ? arrowType.getTypeID() : null;

        if (val instanceof LocalDateTime) {
            return Timestamp.fromLocalDateTime((LocalDateTime) val);
        }
        if (val instanceof Long && typeID == ArrowType.ArrowTypeID.Timestamp) {
            return Timestamp.fromMicros((Long) val);
        }
        if (val instanceof java.time.LocalDate) {
            return (int) ((java.time.LocalDate) val).toEpochDay();
        }
        if (val instanceof Integer && typeID == ArrowType.ArrowTypeID.Date) {
            return val;
        }
        if (val instanceof BigDecimal) {
            BigDecimal bd = (BigDecimal) val;
            return Decimal.fromBigDecimal(bd, bd.precision(), bd.scale());
        }
        if (targetType instanceof RowType && val instanceof Map) {
            RowType rowType = (RowType) targetType;
            Map<?, ?> mapVal = (Map<?, ?>) val;
            List<DataField> childFields = rowType.getFields();
            GenericRow structRow = new GenericRow(childFields.size());

            for (int i = 0; i < childFields.size(); i++) {
                DataField childField = childFields.get(i);
                Object childVal = mapVal.get(childField.name());

                Field childArrowField = null;
                if (arrowField != null && arrowField.getChildren() != null) {
                    for (Field f : arrowField.getChildren()) {
                        if (f.getName().equals(childField.name())) {
                            childArrowField = f;
                            break;
                        }
                    }
                }
                structRow.setField(i, convertToPaimonType(childVal, childArrowField, childField.type()));
            }
            return structRow;
        }
        if (targetType instanceof MapType && val instanceof List) {
            MapType mapType = (MapType) targetType;
            List<?> list = (List<?>) val;
            Map<Object, Object> convertedMap = new HashMap<>();

            Field keyArrowField = null;
            Field valueArrowField = null;
            if (arrowField != null && !arrowField.getChildren().isEmpty()) {
                Field entriesStruct = arrowField.getChildren().get(0);
                if (entriesStruct.getChildren().size() >= 2) {
                    keyArrowField = entriesStruct.getChildren().get(0);
                    valueArrowField = entriesStruct.getChildren().get(1);
                }
            }

            String keyName = keyArrowField != null ? keyArrowField.getName() : "key";
            String valueName = valueArrowField != null ? valueArrowField.getName() : "value";

            for (Object element : list) {
                if (element instanceof Map) { // Arrow ��������� Struct ������������ Java Map
                    Map<?, ?> kvStruct = (Map<?, ?>) element;
                    Object k = convertToPaimonType(kvStruct.get(keyName), keyArrowField, mapType.getKeyType());
                    Object v = convertToPaimonType(kvStruct.get(valueName), valueArrowField, mapType.getValueType());
                    convertedMap.put(k, v);
                }
            }
            return new GenericMap(convertedMap);
        }
        if (targetType instanceof ArrayType && val instanceof List) {
            ArrayType arrayType = (ArrayType) targetType;
            List<?> list = (List<?>) val;
            Object[] convertedArray = new Object[list.size()];
            Field childArrowField = (arrowField != null && !arrowField.getChildren().isEmpty())
                    ? arrowField.getChildren().get(0) : null;

            for (int i = 0; i < list.size(); i++) {
                convertedArray[i] = convertToPaimonType(list.get(i), childArrowField, arrayType.getElementType());
            }
            return new GenericArray(convertedArray);
        }

        if (val instanceof byte[]) {
            return val;
        }

        return val;
    }

    public byte[][] prepareCommit() throws Exception {
        if (writer == null) {
            return new byte[0][];
        }
        return preExecutionAuthenticator.execute(() -> {
            try {
                List<CommitMessage> messages = writer.prepareCommit();

                if (messages == null || messages.isEmpty()) {
                    LOG.info("paimon: prepareCommit returns empty, location={}", tableLocation);
                    return new byte[0][];
                }
                LOG.info("paimon: prepareCommit returns {} messages", messages.size());
                final int maxPayloadBytes = 8 * 1024 * 1024;
                int chunkSize = 512;
                java.util.ArrayList<byte[]> payloads = new java.util.ArrayList<>();
                int i = 0;
                while (i < messages.size()) {
                    int end = Math.min(i + chunkSize, messages.size());
                    DataOutputSerializer outputView = new DataOutputSerializer(1024);
                    serializer.serializeList(messages.subList(i, end), outputView);
                    byte[] data = outputView.getCopyOfBuffer();
                    int len = data.length;
                    int version = serializer.getVersion();
                    byte[] payload = new byte[12 + len];
                    payload[0] = 'D';
                    payload[1] = 'P';
                    payload[2] = 'C';
                    payload[3] = 'M';
                    payload[4] = (byte) ((version >>> 24) & 0xFF);
                    payload[5] = (byte) ((version >>> 16) & 0xFF);
                    payload[6] = (byte) ((version >>> 8) & 0xFF);
                    payload[7] = (byte) (version & 0xFF);
                    payload[8] = (byte) ((len >>> 24) & 0xFF);
                    payload[9] = (byte) ((len >>> 16) & 0xFF);
                    payload[10] = (byte) ((len >>> 8) & 0xFF);
                    payload[11] = (byte) (len & 0xFF);
                    System.arraycopy(data, 0, payload, 12, len);
                    if (payload.length > maxPayloadBytes && chunkSize > 1) {
                        chunkSize = Math.max(1, chunkSize / 2);
                        continue;
                    }
                    payloads.add(payload);
                    i = end;
                }
                return payloads.toArray(new byte[0][]);
            } catch (Throwable t) {
                throw contextException("prepareCommit", "tableLocation=" + tableLocation, t);
            }
        });
    }

    public void abort() {
        try {
            if (preExecutionAuthenticator != null) {
                preExecutionAuthenticator.execute(() -> {
                    closeWriterResources(false);
                    return null;
                });
            }
        } catch (Exception e) {
            LOG.error("Failed to abort paimon writer", e);
        }
    }

    public void close() throws Exception {
        try {
            if (preExecutionAuthenticator != null) {
                preExecutionAuthenticator.execute(() -> {
                    closeWriterResources(true);
                    return null;
                });
            }
        } catch (Exception e) {
            LOG.warn("Error while closing PaimonJniWriter", e);
            throw e;
        }
    }

    private void closeWriterResources(boolean closeAllocator) throws Exception {
        try {
            if (writer != null) {
                writer.close();
                writer = null;
            }
            if (ioManager != null) {
                ioManager.close();
                ioManager = null;
            }
            memorySegmentPool = null;
            if (closeAllocator && allocator != null) {
                allocator.close();
            }
        } catch (Throwable t) {
            throw contextException("close", "closeAllocator=" + closeAllocator, t);
        }
    }

    private RuntimeException contextException(String phase, String detail, Throwable cause) {
        return new RuntimeException("Paimon JNI writer failed in phase=" + phase + ", detail={" + detail + "}, state={"
                + currentStateSummary() + "}", cause);
    }

    private DataType[] buildTargetTypes(String[] columnNames) {
        if (columnNames == null) {
            return null;
        }
        DataType[] result = new DataType[columnNames.length];
        for (int i = 0; i < columnNames.length; i++) {
            DataField field = paimonFieldMap.get(columnNames[i]);
            if (field != null) {
                result[i] = field.type();
            } else if (i < paimonFields.size()) {
                result[i] = paimonFields.get(i).type();
            }
        }
        return result;
    }

    private DataType[] resolveTargetTypes(List<Field> fields) {
        if (targetTypes != null && targetTypes.length == fields.size()) {
            return targetTypes;
        }
        DataType[] result = new DataType[fields.size()];
        for (int i = 0; i < fields.size(); i++) {
            Field arrowField = fields.get(i);
            DataField field = paimonFieldMap.get(arrowField.getName());
            if (field != null) {
                result[i] = field.type();
            } else if (i < paimonFields.size()) {
                result[i] = paimonFields.get(i).type();
            }
        }
        return result;
    }

    private BigDecimal getBigDecimalFromArrowBuf(org.apache.arrow.memory.ArrowBuf byteBuf,
            int index, int scale, int byteWidth) {
        byte[] value = new byte[byteWidth];
        byte temp;
        long startIndex = (long) index * byteWidth;
        byteBuf.getBytes(startIndex, value, 0, byteWidth);
        if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
            int stop = byteWidth / 2;
            for (int i = 0; i < stop; i++) {
                temp = value[i];
                int j = (byteWidth - 1) - i;
                value[i] = value[j];
                value[j] = temp;
            }
        }
        return new BigDecimal(new BigInteger(value), scale);
    }

    private String currentStateSummary() {
        return "tableLocation=" + tableLocation
                + ", writerNull=" + (writer == null)
                + ", ioManagerNull=" + (ioManager == null)
                + ", memoryPoolNull=" + (memorySegmentPool == null)
                + ", fieldCount=" + (paimonFields == null ? -1 : paimonFields.size())
                + ", fieldMapSize=" + (paimonFieldMap == null ? -1 : paimonFieldMap.size());
    }

    private ByteBuffer getDirectBuffer(long address, int length) throws Exception {
        Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
        java.lang.reflect.Constructor<?> ctor = cls.getDeclaredConstructor(long.class, int.class);
        ctor.setAccessible(true);
        return (ByteBuffer) ctor.newInstance(address, length);
    }

    private static class DirectBufInputStream extends InputStream {
        private final ByteBuffer buf;

        public DirectBufInputStream(ByteBuffer buf) {
            this.buf = buf;
        }

        @Override
        public int read() {
            return buf.hasRemaining() ? buf.get() & 0xFF : -1;
        }

        @Override
        public int read(byte[] b, int off, int len) {
            if (!buf.hasRemaining()) {
                return -1;
            }
            int toRead = Math.min(len, buf.remaining());
            buf.get(b, off, toRead);
            return toRead;
        }
    }

    private static Catalog createCatalog(
            Map<String, String> paimonOptionParams,
            Map<String, String> hadoopOptionParams) {
        Options options = new Options();
        paimonOptionParams.entrySet().stream().forEach(kv -> options.set(kv.getKey(), kv.getValue()));
        Configuration configuration;
        configuration = new Configuration();
        hadoopOptionParams.entrySet().stream().forEach(kv -> configuration.set(kv.getKey(), kv.getValue()));
        String hadoopConfigPath = options.getString("hadoop-conf-dir", (String) null);
        if (hadoopConfigPath != null) {
            String coreSiteFile = String.format("%score-site.xml", hadoopConfigPath);
            Path coreSitePath = new Path(coreSiteFile);
            String hdfsSiteFile = String.format("%shdfs-site.xml", hadoopConfigPath);
            Path hdfsSitePath = new Path(hdfsSiteFile);
            configuration.addResource(coreSitePath);
            configuration.addResource(hdfsSitePath);
        }
        String hiveConfigPath = options.getString("hive-conf-dir", (String) null);
        if (hiveConfigPath != null) {
            String hiveSiteFile = String.format("%shive-site.xml", hiveConfigPath);
            Path hiveSitePath = new Path(hiveSiteFile);
            configuration.addResource(hiveSitePath);
        }
        paimonOptionParams.entrySet().forEach(entry -> configuration.set(entry.getKey(), entry.getValue()));
        CatalogContext context = CatalogContext.create(options, configuration);
        return CatalogFactory.createCatalog(context);
    }

}