PaimonScanNode.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon.source;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.InstantiationUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class PaimonScanNode extends FileQueryScanNode {
    private enum SplitReadType {
        JNI,
        NATIVE,
    }

    private class SplitStat {
        SplitReadType type = SplitReadType.JNI;
        private long rowCount = 0;
        private boolean rawFileConvertable = false;
        private boolean hasDeletionVector = false;

        public void setType(SplitReadType type) {
            this.type = type;
        }

        public void setRowCount(long rowCount) {
            this.rowCount = rowCount;
        }

        public void setRawFileConvertable(boolean rawFileConvertable) {
            this.rawFileConvertable = rawFileConvertable;
        }

        public void setHasDeletionVector(boolean hasDeletionVector) {
            this.hasDeletionVector = hasDeletionVector;
        }

        @Override
        public String toString() {
            return "SplitStat [type=" + type + ", rowCount=" + rowCount + ", rawFileConvertable=" + rawFileConvertable
                    + ", hasDeletionVector=" + hasDeletionVector + "]";
        }
    }

    private static final Logger LOG = LogManager.getLogger(PaimonScanNode.class);
    private PaimonSource source = null;
    private List<Predicate> predicates;
    private int rawFileSplitNum = 0;
    private int paimonSplitNum = 0;
    private List<SplitStat> splitStats = new ArrayList<>();
    private String serializedTable;
    private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;

    public PaimonScanNode(PlanNodeId id,
            TupleDescriptor desc,
            boolean needCheckColumnPriv,
            SessionVariable sv) {
        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv, sv);
    }

    @Override
    protected void doInitialize() throws UserException {
        super.doInitialize();
        source = new PaimonSource(desc);
        serializedTable = encodeObjectToString(source.getPaimonTable());
        Preconditions.checkNotNull(source);
        params.setHistorySchemaInfo(new ConcurrentHashMap<>());
    }

    @VisibleForTesting
    public void setSource(PaimonSource source) {
        this.source = source;
    }

    @Override
    protected void convertPredicate() {
        PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter(
                source.getPaimonTable().rowType());
        predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
    }

    private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding();

    public static <T> String encodeObjectToString(T t) {
        try {
            byte[] bytes = InstantiationUtil.serializeObject(t);
            return new String(BASE64_ENCODER.encode(bytes), java.nio.charset.StandardCharsets.UTF_8);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
        if (split instanceof PaimonSplit) {
            setPaimonParams(rangeDesc, (PaimonSplit) split);
        }
    }

    @Override
    protected Optional<String> getSerializedTable() {
        return Optional.of(serializedTable);
    }

    private Map<Integer, String> getSchemaInfo(Long schemaId) {
        PaimonExternalTable table = (PaimonExternalTable) source.getTargetTable();
        TableSchema tableSchema = table.getPaimonSchemaCacheValue(schemaId).getTableSchema();
        Map<Integer, String> columnIdToName = new HashMap<>(tableSchema.fields().size());
        for (DataField dataField : tableSchema.fields()) {
            columnIdToName.put(dataField.id(), dataField.name().toLowerCase());
        }

        return columnIdToName;
    }

    private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
        tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
        TPaimonFileDesc fileDesc = new TPaimonFileDesc();
        org.apache.paimon.table.source.Split split = paimonSplit.getSplit();

        String fileFormat = getFileFormat(paimonSplit.getPathString());
        if (split != null) {
            // use jni reader
            rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
            fileDesc.setPaimonSplit(encodeObjectToString(split));
            rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight());
        } else {
            // use native reader
            if (fileFormat.equals("orc")) {
                rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
            } else if (fileFormat.equals("parquet")) {
                rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
            } else {
                throw new RuntimeException("Unsupported file format: " + fileFormat);
            }
            fileDesc.setSchemaId(paimonSplit.getSchemaId());
            params.history_schema_info.computeIfAbsent(paimonSplit.getSchemaId(), this::getSchemaInfo);
        }
        fileDesc.setFileFormat(fileFormat);
        fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
        fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName())
                .collect(Collectors.joining(",")));
        fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName());
        fileDesc.setPaimonOptions(((PaimonExternalCatalog) source.getCatalog()).getPaimonOptionsMap());
        fileDesc.setTableName(source.getTargetTable().getName());
        fileDesc.setCtlId(source.getCatalog().getId());
        fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId());
        fileDesc.setTblId(source.getTargetTable().getId());
        fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
        // The hadoop conf should be same with
        // PaimonExternalCatalog.createCatalog()#getConfiguration()
        fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
        Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
        if (optDeletionFile.isPresent()) {
            DeletionFile deletionFile = optDeletionFile.get();
            TPaimonDeletionFileDesc tDeletionFile = new TPaimonDeletionFileDesc();
            // convert the deletion file uri to make sure FileReader can read it in be
            LocationPath locationPath = new LocationPath(deletionFile.path(),
                    source.getCatalog().getProperties());
            String path = locationPath.toStorageLocation().toString();
            tDeletionFile.setPath(path);
            tDeletionFile.setOffset(deletionFile.offset());
            tDeletionFile.setLength(deletionFile.length());
            fileDesc.setDeletionFile(tDeletionFile);
        }
        if (paimonSplit.getRowCount().isPresent()) {
            tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get());
        }
        tableFormatFileDesc.setPaimonParams(fileDesc);
        rangeDesc.setTableFormatParams(tableFormatFileDesc);
    }

    @VisibleForTesting
    public static Optional<Long> calcuteTableLevelCount(List<org.apache.paimon.table.source.Split> paimonSplits) {
        // check if all splits don't have deletion vector or cardinality of every
        // deletion vector is not null
        long totalCount = 0;
        long deletionVectorCount = 0;

        for (org.apache.paimon.table.source.Split s : paimonSplits) {
            totalCount += s.rowCount();

            Optional<List<DeletionFile>> deletionFiles = s.deletionFiles();
            if (deletionFiles.isPresent()) {
                for (DeletionFile dv : deletionFiles.get()) {
                    if (dv != null) {
                        Long cardinality = dv.cardinality();
                        if (cardinality == null) {
                            // if there is a null deletion vector, we can't calculate the table level count
                            return Optional.empty();
                        } else {
                            deletionVectorCount += cardinality;
                        }
                    }
                }
            }
        }
        return Optional.of(totalCount - deletionVectorCount);
    }

    @Override
    public List<Split> getSplits(int numBackends) throws UserException {
        boolean forceJniScanner = sessionVariable.isForceJniScanner();
        SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType
                .valueOf(sessionVariable.getIgnoreSplitType());
        List<Split> splits = new ArrayList<>();

        List<org.apache.paimon.table.source.Split> paimonSplits = getPaimonSplitFromAPI();
        boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT;
        // Just for counting the number of selected partitions for this paimon table
        Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
        // if applyCountPushdown is true, we cannot to split the file
        // because the raw file and deletion vector is one-to-one mapping
        long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0);
        for (org.apache.paimon.table.source.Split split : paimonSplits) {
            SplitStat splitStat = new SplitStat();
            splitStat.setRowCount(split.rowCount());
            if (!forceJniScanner && split instanceof DataSplit) {
                DataSplit dataSplit = (DataSplit) split;
                BinaryRow partitionValue = dataSplit.partition();
                selectedPartitionValues.add(partitionValue);
                Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles();
                Optional<List<DeletionFile>> optDeletionFiles = dataSplit.deletionFiles();

                if (supportNativeReader(optRawFiles)) {
                    if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
                        continue;
                    }
                    splitStat.setType(SplitReadType.NATIVE);
                    splitStat.setRawFileConvertable(true);
                    List<RawFile> rawFiles = optRawFiles.get();
                    for (int i = 0; i < rawFiles.size(); i++) {
                        RawFile file = rawFiles.get(i);
                        LocationPath locationPath = new LocationPath(file.path(),
                                source.getCatalog().getProperties());
                        try {
                            List<Split> dorisSplits = FileSplitter.splitFile(
                                    locationPath,
                                    realFileSplitSize,
                                    null,
                                    file.length(),
                                    -1,
                                    true,
                                    null,
                                    PaimonSplit.PaimonSplitCreator.DEFAULT);
                            for (Split dorisSplit : dorisSplits) {
                                ((PaimonSplit) dorisSplit).setSchemaId(file.schemaId());
                                // try to set deletion file
                                if (optDeletionFiles.isPresent() && optDeletionFiles.get().get(i) != null) {
                                    ((PaimonSplit) dorisSplit).setDeletionFile(optDeletionFiles.get().get(i));
                                    splitStat.setHasDeletionVector(true);
                                }
                            }
                            splits.addAll(dorisSplits);
                            ++rawFileSplitNum;
                        } catch (IOException e) {
                            throw new UserException("Paimon error to split file: " + e.getMessage(), e);
                        }
                    }
                } else {
                    if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
                        continue;
                    }
                    splits.add(new PaimonSplit(split));
                    ++paimonSplitNum;
                }
            } else {
                if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
                    continue;
                }
                splits.add(new PaimonSplit(split));
                ++paimonSplitNum;
            }
            splitStats.add(splitStat);
        }

        // We need to set the target size for all splits so that we can calculate the proportion of each split later.
        splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));

        // if applyCountPushdown is true, calcute row count for count pushdown
        if (applyCountPushdown) {
            // we can create a special empty split and skip the plan process
            if (splits.isEmpty()) {
                return splits;
            }
            Optional<Long> optTableLevelCount = calcuteTableLevelCount(paimonSplits);
            if (optTableLevelCount.isPresent()) {
                long tableLevelRowCount = optTableLevelCount.get();
                List<Split> pushDownCountSplits;
                if (tableLevelRowCount > COUNT_WITH_PARALLEL_SPLITS) {
                    int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends;
                    pushDownCountSplits = splits.subList(0, Math.min(splits.size(), minSplits));
                } else {
                    pushDownCountSplits = Collections.singletonList(splits.get(0));
                }
                setPushDownCount(tableLevelRowCount);
                assignCountToSplits(pushDownCountSplits, tableLevelRowCount);
                return pushDownCountSplits;
            }
        }

        this.selectedPartitionNum = selectedPartitionValues.size();
        return splits;
    }

    @VisibleForTesting
    public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
        if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) {
            // an empty table in PaimonSnapshotCacheValue
            return Collections.emptyList();
        }
        int[] projected = desc.getSlots().stream().mapToInt(
            slot -> source.getPaimonTable().rowType()
                    .getFieldNames()
                    .stream()
                    .map(String::toLowerCase)
                    .collect(Collectors.toList())
                    .indexOf(slot.getColumn().getName()))
                    .toArray();
        ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
        return readBuilder.withFilter(predicates)
                .withProjection(projected)
                .newScan().plan().splits();
    }

    private String getFileFormat(String path) {
        return FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties());
    }

    @VisibleForTesting
    public boolean supportNativeReader(Optional<List<RawFile>> optRawFiles) {
        if (!optRawFiles.isPresent()) {
            return false;
        }
        List<String> files = optRawFiles.get().stream().map(RawFile::path).collect(Collectors.toList());
        for (String f : files) {
            String splitFileFormat = getFileFormat(f);
            if (!splitFileFormat.equals("orc") && !splitFileFormat.equals("parquet")) {
                return false;
            }
        }
        return true;
    }

    @Override
    public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
        return TFileFormatType.FORMAT_JNI;
    }

    @Override
    public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
        // return new ArrayList<>(source.getPaimonTable().partitionKeys());
        // Paimon is not aware of partitions and bypasses some existing logic by
        // returning an empty list
        return new ArrayList<>();
    }

    @Override
    public TableIf getTargetTable() {
        return desc.getTable();
    }

    @Override
    public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
        HashMap<String, String> map = new HashMap<>(source.getCatalog().getProperties());
        source.getCatalog().getCatalogProperty().getHadoopProperties().forEach((k, v) -> {
            if (!map.containsKey(k)) {
                map.put(k, v);
            }
        });
        return map;
    }

    @Override
    public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
        StringBuilder sb = new StringBuilder(super.getNodeExplainString(prefix, detailLevel));
        sb.append(String.format("%spaimonNativeReadSplits=%d/%d\n",
                prefix, rawFileSplitNum, (paimonSplitNum + rawFileSplitNum)));

        sb.append(prefix).append("predicatesFromPaimon:");
        if (predicates.isEmpty()) {
            sb.append(" NONE\n");
        } else {
            sb.append("\n");
            for (Predicate predicate : predicates) {
                sb.append(prefix).append(prefix).append(predicate).append("\n");
            }
        }

        if (detailLevel == TExplainLevel.VERBOSE) {
            sb.append(prefix).append("PaimonSplitStats: \n");
            int size = splitStats.size();
            if (size <= 4) {
                for (SplitStat splitStat : splitStats) {
                    sb.append(String.format("%s  %s\n", prefix, splitStat));
                }
            } else {
                for (int i = 0; i < 3; i++) {
                    SplitStat splitStat = splitStats.get(i);
                    sb.append(String.format("%s  %s\n", prefix, splitStat));
                }
                int other = size - 4;
                sb.append(prefix).append("  ... other ").append(other).append(" paimon split stats ...\n");
                SplitStat split = splitStats.get(size - 1);
                sb.append(String.format("%s  %s\n", prefix, split));
            }
        }
        return sb.toString();
    }

    private void assignCountToSplits(List<Split> splits, long totalCount) {
        int size = splits.size();
        long countPerSplit = totalCount / size;
        for (int i = 0; i < size - 1; i++) {
            ((PaimonSplit) splits.get(i)).setRowCount(countPerSplit);
        }
        ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size);
    }
}