PluginDrivenScanNode.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprToSqlVisitor;
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.handle.ConnectorColumnHandle;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.handle.PassthroughQueryTableHandle;
import org.apache.doris.connector.api.pushdown.ConnectorExpression;
import org.apache.doris.connector.api.pushdown.ConnectorFilterConstraint;
import org.apache.doris.connector.api.pushdown.FilterApplicationResult;
import org.apache.doris.connector.api.pushdown.LimitApplicationResult;
import org.apache.doris.connector.api.pushdown.ProjectionApplicationResult;
import org.apache.doris.connector.api.scan.ConnectorScanPlanProvider;
import org.apache.doris.connector.api.scan.ConnectorScanRange;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TMaxComputeFileDesc;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.doris.thrift.TTrinoConnectorFileDesc;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * Generic scan node that delegates scan planning to the connector SPI.
 *
 * <p>Replaces connector-specific ScanNode subclasses for plugin-driven catalogs.
 * Extends {@link FileQueryScanNode} to reuse the existing split-to-Thrift
 * conversion pipeline. Uses {@code FORMAT_JNI} by default, which routes
 * to BE's JNI scanner framework.</p>
 *
 * <p>Scan flow:</p>
 * <ol>
 *   <li>{@link #getSplits} calls {@link ConnectorScanPlanProvider#planScan}
 *       to obtain {@link ConnectorScanRange}s from the connector plugin</li>
 *   <li>Each range is wrapped in a {@link PluginDrivenSplit}</li>
 *   <li>{@link FileQueryScanNode#createScanRangeLocations} distributes splits
 *       to backends</li>
 *   <li>{@link #setScanParams} populates {@link TTableFormatFileDesc} with
 *       connector-specific properties for each split</li>
 * </ol>
 */
public class PluginDrivenScanNode extends FileQueryScanNode {

    private static final Logger LOG = LogManager.getLogger(PluginDrivenScanNode.class);

    private static final String TABLE_FORMAT_TYPE = "plugin_driven";

    /** Scan node property keys (shared with connector plugins). */
    private static final String PROP_FILE_FORMAT_TYPE = "file_format_type";
    private static final String PROP_PATH_PARTITION_KEYS = "path_partition_keys";
    private static final String PROP_LOCATION_PREFIX = "location.";
    private static final String PROP_HIVE_TEXT_PREFIX = "hive.text.";

    private final Connector connector;
    private final ConnectorSession connectorSession;

    // Set during filter pushdown; may be updated from the original table handle.
    private ConnectorTableHandle currentHandle;

    // Populated from ConnectorScanPlanProvider.getScanNodeProperties()
    private Map<String, String> scanNodeProperties;

    public PluginDrivenScanNode(PlanNodeId id, TupleDescriptor desc,
            boolean needCheckColumnPriv, SessionVariable sv,
            ScanContext scanContext, Connector connector,
            ConnectorSession connectorSession, ConnectorTableHandle tableHandle) {
        super(id, desc, "PluginDrivenScanNode", scanContext, needCheckColumnPriv, sv);
        this.connector = connector;
        this.connectorSession = connectorSession;
        this.currentHandle = tableHandle;
    }

    /**
     * Creates a PluginDrivenScanNode by resolving the connector, session, and table handle
     * from the plugin-driven catalog and table.
     */
    public static PluginDrivenScanNode create(PlanNodeId id, TupleDescriptor desc,
            boolean needCheckColumnPriv, SessionVariable sv,
            ScanContext scanContext, PluginDrivenExternalCatalog catalog,
            PluginDrivenExternalTable table) {
        Connector connector = catalog.getConnector();
        ConnectorSession session = catalog.buildConnectorSession();
        ConnectorMetadata metadata = connector.getMetadata(session);
        String dbName = table.getDb() != null ? table.getDb().getRemoteName() : "";
        String tableName = table.getRemoteName();
        ConnectorTableHandle handle = metadata.getTableHandle(session, dbName, tableName)
                .orElseThrow(() -> new RuntimeException(
                        "Table handle not found for plugin-driven table: " + dbName + "." + tableName));
        return new PluginDrivenScanNode(id, desc, needCheckColumnPriv, sv,
                scanContext, connector, session, handle);
    }

    @Override
    public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
        StringBuilder output = new StringBuilder();
        if (currentHandle instanceof PassthroughQueryTableHandle) {
            output.append(prefix).append("TABLE VALUE FUNCTION\n");
            String query = ((PassthroughQueryTableHandle) currentHandle).getQuery();
            output.append(prefix).append("QUERY: ").append(query).append("\n");
        } else {
            Map<String, String> props = getOrLoadScanNodeProperties();
            String query = props.get("query");
            output.append(prefix).append("TABLE: ")
                    .append(desc.getTable().getNameWithFullQualifiers()).append("\n");
            if (query != null) {
                output.append(prefix).append("QUERY: ").append(query).append("\n");
            }
            if (!conjuncts.isEmpty()) {
                Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
                output.append(prefix).append("PREDICATES: ")
                        .append(expr.accept(ExprToSqlVisitor.INSTANCE, ToSqlParams.WITH_TABLE))
                        .append("\n");
            }
        }
        if (useTopnFilter()) {
            String topnFilterSources = String.join(",",
                    topnFilterSortNodes.stream()
                            .map(node -> node.getId().asInt() + "").collect(Collectors.toList()));
            output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n");
        }
        return output.toString();
    }

    @Override
    protected TFileFormatType getFileFormatType() throws UserException {
        Map<String, String> props = getOrLoadScanNodeProperties();
        String format = props.get(PROP_FILE_FORMAT_TYPE);
        if (format != null) {
            return mapFileFormatType(format);
        }
        return TFileFormatType.FORMAT_JNI;
    }

    @Override
    protected List<String> getPathPartitionKeys() throws UserException {
        Map<String, String> props = getOrLoadScanNodeProperties();
        String keys = props.get(PROP_PATH_PARTITION_KEYS);
        if (keys != null && !keys.isEmpty()) {
            return Arrays.asList(keys.split(","));
        }
        return Collections.emptyList();
    }

    @Override
    protected TableIf getTargetTable() throws UserException {
        return desc.getTable();
    }

    @Override
    protected Map<String, String> getLocationProperties() throws UserException {
        Map<String, String> props = getOrLoadScanNodeProperties();
        Map<String, String> locationProps = new HashMap<>();
        for (Map.Entry<String, String> entry : props.entrySet()) {
            if (entry.getKey().startsWith(PROP_LOCATION_PREFIX)) {
                String realKey = entry.getKey().substring(PROP_LOCATION_PREFIX.length());
                locationProps.put(realKey, entry.getValue());
            }
        }
        return locationProps;
    }

    @Override
    protected TFileAttributes getFileAttributes() throws UserException {
        Map<String, String> props = getOrLoadScanNodeProperties();
        String serDeLib = props.get(PROP_HIVE_TEXT_PREFIX + "serde_lib");
        if (serDeLib == null || serDeLib.isEmpty()) {
            return new TFileAttributes();
        }

        TFileAttributes attrs = new TFileAttributes();
        String skipLinesStr = props.get(PROP_HIVE_TEXT_PREFIX + "skip_lines");
        if (skipLinesStr != null) {
            try {
                attrs.setSkipLines(Integer.parseInt(skipLinesStr));
            } catch (NumberFormatException e) {
                // ignore
            }
        }

        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
        String colSep = props.get(PROP_HIVE_TEXT_PREFIX + "column_separator");
        if (colSep != null) {
            textParams.setColumnSeparator(colSep);
        }
        String lineSep = props.get(PROP_HIVE_TEXT_PREFIX + "line_delimiter");
        if (lineSep != null) {
            textParams.setLineDelimiter(lineSep);
        }
        String mapkvDelim = props.get(PROP_HIVE_TEXT_PREFIX + "mapkv_delimiter");
        if (mapkvDelim != null) {
            textParams.setMapkvDelimiter(mapkvDelim);
        }
        String collDelim = props.get(PROP_HIVE_TEXT_PREFIX + "collection_delimiter");
        if (collDelim != null) {
            textParams.setCollectionDelimiter(collDelim);
        }
        String escape = props.get(PROP_HIVE_TEXT_PREFIX + "escape");
        if (escape != null && !escape.isEmpty()) {
            textParams.setEscape(escape.getBytes()[0]);
        }
        String nullFmt = props.get(PROP_HIVE_TEXT_PREFIX + "null_format");
        if (nullFmt != null) {
            textParams.setNullFormat(nullFmt);
        }
        String enclose = props.get(PROP_HIVE_TEXT_PREFIX + "enclose");
        if (enclose != null && !enclose.isEmpty()) {
            textParams.setEnclose(enclose.getBytes()[0]);
            attrs.setTrimDoubleQuotes(true);
        }

        attrs.setTextParams(textParams);
        attrs.setHeaderType("");
        attrs.setEnableTextValidateUtf8(sessionVariable.enableTextValidateUtf8);

        String isJson = props.get(PROP_HIVE_TEXT_PREFIX + "is_json");
        if ("true".equals(isJson)) {
            attrs.setReadJsonByLine(true);
            attrs.setReadByColumnDef(true);
        }

        return attrs;
    }

    @Override
    protected void convertPredicate() {
        // Attempt filter pushdown via the connector SPI
        if (conjuncts == null || conjuncts.isEmpty()) {
            return;
        }
        ConnectorMetadata metadata = connector.getMetadata(connectorSession);
        ConnectorFilterConstraint constraint = buildFilterConstraint(conjuncts);
        Optional<FilterApplicationResult<ConnectorTableHandle>> result =
                metadata.applyFilter(connectorSession, currentHandle, constraint);
        if (result.isPresent()) {
            FilterApplicationResult<ConnectorTableHandle> filterResult = result.get();
            currentHandle = filterResult.getHandle();

            // Consume remainingFilter to avoid duplicate predicate evaluation on BE:
            // - null means all predicates were fully pushed down ��� clear conjuncts
            // - non-null means some/all predicates remain ��� keep conjuncts (conservative)
            ConnectorExpression remaining = filterResult.getRemainingFilter();
            if (remaining == null) {
                conjuncts.clear();
                LOG.debug("Filter fully pushed down for plugin-driven scan, cleared conjuncts");
            } else {
                // Partial or full remaining: keep all conjuncts for BE-side evaluation.
                // Fine-grained conjunct removal (matching individual remaining sub-expressions
                // back to original Expr conjuncts) is deferred to a future enhancement.
                LOG.debug("Filter pushdown accepted with remaining filter, keeping conjuncts");
            }
        }
        // Invalidate cached properties so they are rebuilt with the updated conjuncts/handle.
        scanNodeProperties = null;
    }

    /**
     * Attempts to push the limit down via the SPI applyLimit() protocol.
     * Called before getSplits(), after filter pushdown.
     *
     * <p>If the connector accepts the limit, the handle is updated.
     * The limit is still passed to planScan() as a parameter for
     * connectors that handle limit directly in planScan().</p>
     */
    private void tryPushDownLimit() {
        if (limit <= 0) {
            return;
        }
        ConnectorMetadata metadata = connector.getMetadata(connectorSession);
        Optional<LimitApplicationResult<ConnectorTableHandle>> result =
                metadata.applyLimit(connectorSession, currentHandle, limit);
        if (result.isPresent()) {
            currentHandle = result.get().getHandle();
            LOG.debug("Limit {} pushed down via applyLimit for plugin-driven scan", limit);
        }
    }

    /**
     * Attempts to push the projection down via the SPI applyProjection() protocol.
     * Called before getSplits(), after filter and limit pushdown.
     *
     * <p>If the connector accepts the projection, the handle is updated.</p>
     */
    private void tryPushDownProjection(List<ConnectorColumnHandle> columns) {
        if (columns.isEmpty()) {
            return;
        }
        ConnectorMetadata metadata = connector.getMetadata(connectorSession);
        Optional<ProjectionApplicationResult<ConnectorTableHandle>> result =
                metadata.applyProjection(connectorSession, currentHandle, columns);
        if (result.isPresent()) {
            currentHandle = result.get().getHandle();
            LOG.debug("Projection pushed down via applyProjection for plugin-driven scan");
        }
    }

    @Override
    public List<Split> getSplits(int numBackends) throws UserException {
        // Attempt limit and projection pushdown via SPI protocol
        tryPushDownLimit();

        ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
        if (scanProvider == null) {
            LOG.warn("Connector does not provide a scan plan provider, returning empty splits");
            return Collections.emptyList();
        }

        List<ConnectorColumnHandle> columns = buildColumnHandles();
        tryPushDownProjection(columns);
        Optional<ConnectorExpression> remainingFilter = buildRemainingFilter();

        List<ConnectorScanRange> ranges = scanProvider.planScan(
                connectorSession, currentHandle, columns, remainingFilter, limit);

        List<Split> splits = new ArrayList<>(ranges.size());
        for (ConnectorScanRange range : ranges) {
            splits.add(new PluginDrivenSplit(range));
        }
        return splits;
    }

    @Override
    protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
        if (!(split instanceof PluginDrivenSplit)) {
            return;
        }
        PluginDrivenSplit pluginSplit = (PluginDrivenSplit) split;
        ConnectorScanRange scanRange = pluginSplit.getConnectorScanRange();

        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
        tableFormatFileDesc.setTableFormatType(scanRange.getTableFormatType());

        String formatType = scanRange.getTableFormatType();
        if ("max_compute".equals(formatType)) {
            setMaxComputeParams(tableFormatFileDesc, scanRange, rangeDesc);
        } else if ("trino_connector".equals(formatType)) {
            setTrinoConnectorParams(tableFormatFileDesc, scanRange);
        } else if ("hive".equals(formatType)) {
            setHiveParams(tableFormatFileDesc, scanRange);
        } else if ("transactional_hive".equals(formatType)) {
            setTransactionalHiveParams(tableFormatFileDesc, scanRange);
        } else if ("hudi".equals(formatType)) {
            setHudiParams(tableFormatFileDesc, scanRange, rangeDesc);
        } else if ("paimon".equals(formatType)) {
            setPaimonParams(tableFormatFileDesc, scanRange, rangeDesc);
        } else {
            setGenericParams(tableFormatFileDesc, scanRange);
        }

        rangeDesc.setTableFormatParams(tableFormatFileDesc);
    }

    /**
     * Sets MaxCompute-specific scan params via TMaxComputeFileDesc.
     * BE expects typed Thrift fields, not a generic Map.
     */
    private void setMaxComputeParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange, TFileRangeDesc rangeDesc) {
        Map<String, String> props = scanRange.getProperties();
        TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
        fileDesc.setPartitionSpec("deprecated");
        fileDesc.setTableBatchReadSession(
                props.getOrDefault("table_batch_read_session", ""));
        fileDesc.setSessionId(props.getOrDefault("session_id", ""));
        fileDesc.setReadTimeout(
                Integer.parseInt(props.getOrDefault("read_timeout", "120")));
        fileDesc.setConnectTimeout(
                Integer.parseInt(props.getOrDefault("connect_timeout", "10")));
        fileDesc.setRetryTimes(
                Integer.parseInt(props.getOrDefault("retry_times", "4")));
        formatDesc.setMaxComputeParams(fileDesc);

        rangeDesc.setPath("[ " + scanRange.getStart() + " , " + scanRange.getLength() + " ]");
        rangeDesc.setStartOffset(scanRange.getStart());
        rangeDesc.setSize(scanRange.getLength());
    }

    /**
     * Sets Trino connector-specific scan params via TTrinoConnectorFileDesc.
     * All values are pre-serialized JSON strings from the plugin module.
     */
    private void setTrinoConnectorParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange) {
        Map<String, String> props = scanRange.getProperties();
        TTrinoConnectorFileDesc fileDesc = new TTrinoConnectorFileDesc();
        fileDesc.setCatalogName(props.getOrDefault("catalog_name", ""));
        fileDesc.setDbName(props.getOrDefault("db_name", ""));
        fileDesc.setTableName(props.getOrDefault("table_name", ""));
        fileDesc.setTrinoConnectorSplit(
                props.getOrDefault("trino_connector_split", ""));
        fileDesc.setTrinoConnectorTableHandle(
                props.getOrDefault("trino_connector_table_handle", ""));
        fileDesc.setTrinoConnectorColumnHandles(
                props.getOrDefault("trino_connector_column_handles", ""));
        fileDesc.setTrinoConnectorColumnMetadata(
                props.getOrDefault("trino_connector_column_metadata", ""));
        fileDesc.setTrinoConnectorTrascationHandle(
                props.getOrDefault("trino_connector_trascation_handle", ""));

        // Options is a map ��� parse from JSON or use directly
        String optionsJson = props.getOrDefault("trino_connector_options", "{}");
        try {
            @SuppressWarnings("unchecked")
            Map<String, String> options = new com.fasterxml.jackson.databind.ObjectMapper()
                    .readValue(optionsJson,
                            new com.fasterxml.jackson.core.type.TypeReference<Map<String, String>>() {});
            fileDesc.setTrinoConnectorOptions(options);
        } catch (Exception e) {
            LOG.warn("Failed to parse trino_connector_options JSON, using empty map", e);
            fileDesc.setTrinoConnectorOptions(new HashMap<>());
        }
        formatDesc.setTrinoConnectorParams(fileDesc);
    }

    /**
     * Sets generic scan params via jdbc_params map (for JDBC, plugin_driven, etc.).
     */
    private void setGenericParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange) {
        Map<String, String> props = new HashMap<>(scanRange.getProperties());
        props.put("connector_scan_range_type", scanRange.getRangeType().name());
        props.put("connector_file_format", scanRange.getFileFormat());

        Map<String, String> partValues = scanRange.getPartitionValues();
        if (partValues != null && !partValues.isEmpty()) {
            for (Map.Entry<String, String> entry : partValues.entrySet()) {
                props.put("partition." + entry.getKey(), entry.getValue());
            }
        }

        formatDesc.setJdbcParams(props);
    }

    /**
     * Sets Hive-specific scan params. For non-transactional Hive tables,
     * the TTableFormatFileDesc carries "hive" as the table format type.
     * The file path/offset/length are already set by the FileSplit pipeline.
     */
    private void setHiveParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange) {
        // For non-transactional hive, minimal params needed.
        // The file format, partition values, and text properties are
        // handled at the scan-node level via getFileFormatType(),
        // getPathPartitionKeys(), and getFileAttributes().
        // No per-split TTableFormatFileDesc fields needed beyond the type.
    }

    /**
     * Sets transactional Hive (ACID) scan params. Populates
     * TTransactionalHiveDesc with partition location and delete deltas.
     */
    private void setTransactionalHiveParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange) {
        Map<String, String> props = scanRange.getProperties();
        org.apache.doris.thrift.TTransactionalHiveDesc txnDesc =
                new org.apache.doris.thrift.TTransactionalHiveDesc();

        String partLoc = props.get("acid.partition_location");
        if (partLoc != null) {
            txnDesc.setPartition(partLoc);
        }

        String countStr = props.get("acid.delete_delta_count");
        if (countStr != null) {
            int count = Integer.parseInt(countStr);
            List<org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc> deltas =
                    new ArrayList<>(count);
            for (int i = 0; i < count; i++) {
                String deltaStr = props.get("acid.delete_delta." + i);
                if (deltaStr != null) {
                    org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc delta =
                            new org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc();
                    delta.setDirectoryLocation(deltaStr.contains("|")
                            ? deltaStr.substring(0, deltaStr.indexOf('|'))
                            : deltaStr);
                    deltas.add(delta);
                }
            }
            txnDesc.setDeleteDeltas(deltas);
        }

        formatDesc.setTransactionalHiveParams(txnDesc);
    }

    /**
     * Sets Hudi-specific scan params via THudiFileDesc.
     * Handles dynamic format downgrade: MOR splits with no delta logs
     * fall back to native Parquet/ORC reader.
     */
    private void setHudiParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange, TFileRangeDesc rangeDesc) {
        Map<String, String> props = scanRange.getProperties();
        org.apache.doris.thrift.THudiFileDesc fileDesc = new org.apache.doris.thrift.THudiFileDesc();

        String fileFormat = scanRange.getFileFormat();
        boolean isJni = "jni".equalsIgnoreCase(fileFormat);

        // Dynamic format downgrade: if JNI but no delta logs, use native reader
        if (isJni) {
            String deltaLogs = props.get("hudi.delta_logs");
            if (deltaLogs == null || deltaLogs.isEmpty()) {
                String dataFilePath = props.getOrDefault("hudi.data_file_path", "");
                if (!dataFilePath.isEmpty()) {
                    String lower = dataFilePath.toLowerCase();
                    if (lower.endsWith(".parquet")) {
                        rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
                        isJni = false;
                    } else if (lower.endsWith(".orc")) {
                        rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
                        isJni = false;
                    }
                }
            }
        }

        if (isJni) {
            // JNI reader: full metadata needed for Hudi merge reader
            fileDesc.setInstantTime(props.getOrDefault("hudi.instant_time", ""));
            fileDesc.setSerde(props.getOrDefault("hudi.serde", ""));
            fileDesc.setInputFormat(props.getOrDefault("hudi.input_format", ""));
            fileDesc.setBasePath(props.getOrDefault("hudi.base_path", ""));
            fileDesc.setDataFilePath(props.getOrDefault("hudi.data_file_path", ""));
            fileDesc.setDataFileLength(
                    Long.parseLong(props.getOrDefault("hudi.data_file_length", "0")));

            String deltaLogs = props.get("hudi.delta_logs");
            if (deltaLogs != null && !deltaLogs.isEmpty()) {
                fileDesc.setDeltaLogs(Arrays.asList(deltaLogs.split(",")));
            }
            String colNames = props.get("hudi.column_names");
            if (colNames != null && !colNames.isEmpty()) {
                fileDesc.setColumnNames(Arrays.asList(colNames.split(",")));
            }
            String colTypes = props.get("hudi.column_types");
            if (colTypes != null && !colTypes.isEmpty()) {
                fileDesc.setColumnTypes(Arrays.asList(colTypes.split(",")));
            }
        }

        formatDesc.setHudiParams(fileDesc);

        // Set partition values for path-based partition extraction
        Map<String, String> partValues = scanRange.getPartitionValues();
        if (partValues != null && !partValues.isEmpty()) {
            List<String> pathKeys = new ArrayList<>();
            List<String> pathValues = new ArrayList<>();
            for (Map.Entry<String, String> entry : partValues.entrySet()) {
                pathKeys.add(entry.getKey());
                pathValues.add(entry.getValue());
            }
            rangeDesc.setColumnsFromPathKeys(pathKeys);
            rangeDesc.setColumnsFromPath(pathValues);
        }
    }

    /**
     * Sets Paimon-specific scan params via TPaimonFileDesc.
     * Handles both JNI reader (serialized split) and native reader (file path) paths.
     */
    private void setPaimonParams(TTableFormatFileDesc formatDesc,
            ConnectorScanRange scanRange, TFileRangeDesc rangeDesc) {
        Map<String, String> props = scanRange.getProperties();
        TPaimonFileDesc fileDesc = new TPaimonFileDesc();

        String paimonSplit = props.get("paimon.split");
        if (paimonSplit != null) {
            // JNI reader path
            rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
            fileDesc.setPaimonSplit(paimonSplit);
            String tableLocation = props.get("paimon.table_location");
            if (tableLocation != null) {
                fileDesc.setPaimonTable(tableLocation);
            }
            String weightStr = props.get("paimon.self_split_weight");
            if (weightStr != null) {
                rangeDesc.setSelfSplitWeight(Long.parseLong(weightStr));
            }
        } else {
            // Native reader path ��� format already set by file extension
            String fileFormat = scanRange.getFileFormat();
            if ("orc".equals(fileFormat)) {
                rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC);
            } else if ("parquet".equals(fileFormat)) {
                rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
            }
            String schemaIdStr = props.get("paimon.schema_id");
            if (schemaIdStr != null) {
                fileDesc.setSchemaId(Long.parseLong(schemaIdStr));
            }
        }

        fileDesc.setFileFormat(scanRange.getFileFormat());

        // Deletion file
        String deletionPath = props.get("paimon.deletion_file.path");
        if (deletionPath != null) {
            TPaimonDeletionFileDesc deletionFile = new TPaimonDeletionFileDesc();
            deletionFile.setPath(deletionPath);
            deletionFile.setOffset(Long.parseLong(
                    props.getOrDefault("paimon.deletion_file.offset", "0")));
            deletionFile.setLength(Long.parseLong(
                    props.getOrDefault("paimon.deletion_file.length", "0")));
            fileDesc.setDeletionFile(deletionFile);
        }

        // Row count for count pushdown
        String rowCountStr = props.get("paimon.row_count");
        if (rowCountStr != null) {
            formatDesc.setTableLevelRowCount(Long.parseLong(rowCountStr));
        } else {
            formatDesc.setTableLevelRowCount(-1);
        }

        formatDesc.setPaimonParams(fileDesc);

        // Partition values
        Map<String, String> partValues = scanRange.getPartitionValues();
        if (partValues != null && !partValues.isEmpty()) {
            List<String> pathKeys = new ArrayList<>();
            List<String> pathValues = new ArrayList<>();
            List<Boolean> pathIsNull = new ArrayList<>();
            for (Map.Entry<String, String> entry : partValues.entrySet()) {
                pathKeys.add(entry.getKey());
                pathValues.add(entry.getValue() != null ? entry.getValue() : "");
                pathIsNull.add(entry.getValue() == null);
            }
            rangeDesc.setColumnsFromPathKeys(pathKeys);
            rangeDesc.setColumnsFromPath(pathValues);
            rangeDesc.setColumnsFromPathIsNull(pathIsNull);
        }
    }

    @Override
    protected Optional<String> getSerializedTable() {
        Map<String, String> props = getOrLoadScanNodeProperties();
        String serializedTable = props.get("paimon.serialized_table");
        if (serializedTable != null) {
            return Optional.of(serializedTable);
        }
        return Optional.empty();
    }

    @Override
    public void createScanRangeLocations() throws UserException {
        super.createScanRangeLocations();
        setPaimonScanLevelParams();
    }

    /**
     * Sets Paimon scan-level params (predicate + options) on the TFileScanRangeParams.
     * These apply to all splits, not per-split.
     */
    private void setPaimonScanLevelParams() {
        Map<String, String> props = getOrLoadScanNodeProperties();
        String predicate = props.get("paimon.predicate");
        if (predicate != null) {
            params.setPaimonPredicate(predicate);
        }

        String optionsJson = props.get("paimon.options_json");
        if (optionsJson != null && !optionsJson.isEmpty()) {
            try {
                @SuppressWarnings("unchecked")
                Map<String, String> options = new com.fasterxml.jackson.databind.ObjectMapper()
                        .readValue(optionsJson,
                                new com.fasterxml.jackson.core.type.TypeReference<Map<String, String>>() {});
                params.setPaimonOptions(options);
            } catch (Exception e) {
                LOG.warn("Failed to parse paimon.options_json, using empty map", e);
            }
        }
    }

    /**
     * Lazily loads scan node properties from the connector's scan plan provider.
     */
    private Map<String, String> getOrLoadScanNodeProperties() {
        if (scanNodeProperties == null) {
            ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
            if (scanProvider != null) {
                List<ConnectorColumnHandle> columns = buildColumnHandles();
                Optional<ConnectorExpression> filter = buildRemainingFilter();
                scanNodeProperties = scanProvider.getScanNodeProperties(
                        connectorSession, currentHandle, columns, filter);
            }
            if (scanNodeProperties == null) {
                scanNodeProperties = Collections.emptyMap();
            }
        }
        return scanNodeProperties;
    }

    /**
     * Maps a file format name string to the corresponding TFileFormatType.
     */
    private static TFileFormatType mapFileFormatType(String format) {
        switch (format.toLowerCase()) {
            case "parquet":
                return TFileFormatType.FORMAT_PARQUET;
            case "orc":
                return TFileFormatType.FORMAT_ORC;
            case "text":
            case "csv":
                return TFileFormatType.FORMAT_CSV_PLAIN;
            case "json":
                return TFileFormatType.FORMAT_JSON;
            case "avro":
                return TFileFormatType.FORMAT_AVRO;
            default:
                return TFileFormatType.FORMAT_JNI;
        }
    }

    /**
     * Builds column handles from the tuple descriptor's slot descriptors.
     * These tell the connector which columns are needed for the query,
     * enabling optimized column selection (e.g., SELECT col1, col2 instead of SELECT *).
     */
    private List<ConnectorColumnHandle> buildColumnHandles() {
        ConnectorMetadata metadata = connector.getMetadata(connectorSession);
        Map<String, ConnectorColumnHandle> allHandles =
                metadata.getColumnHandles(connectorSession, currentHandle);
        if (allHandles.isEmpty()) {
            return Collections.emptyList();
        }
        List<ConnectorColumnHandle> selected = new ArrayList<>();
        for (org.apache.doris.analysis.SlotDescriptor slot : desc.getSlots()) {
            if (slot.getColumn() != null) {
                ConnectorColumnHandle ch = allHandles.get(slot.getColumn().getName());
                if (ch != null) {
                    selected.add(ch);
                }
            }
        }
        return selected;
    }

    /**
     * Builds a {@link ConnectorFilterConstraint} from the current conjuncts.
     */
    private ConnectorFilterConstraint buildFilterConstraint(List<Expr> exprs) {
        ConnectorExpression combined = ExprToConnectorExpressionConverter.convertConjuncts(exprs);
        return new ConnectorFilterConstraint(combined, Collections.emptyMap());
    }

    /**
     * Builds the remaining filter expression from unconsumed conjuncts.
     * If no conjuncts remain, returns {@link Optional#empty()}.
     * Filters out CAST-containing predicates when the connector does not support CAST pushdown.
     */
    private Optional<ConnectorExpression> buildRemainingFilter() {
        if (conjuncts == null || conjuncts.isEmpty()) {
            return Optional.empty();
        }
        List<Expr> pushableConjuncts = conjuncts;
        ConnectorMetadata metadata = connector.getMetadata(connectorSession);
        if (!metadata.supportsCastPredicatePushdown(connectorSession)) {
            pushableConjuncts = conjuncts.stream()
                    .filter(expr -> !containsCastExpr(expr))
                    .collect(Collectors.toList());
        }
        if (pushableConjuncts.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(ExprToConnectorExpressionConverter.convertConjuncts(pushableConjuncts));
    }

    private static boolean containsCastExpr(Expr expr) {
        List<CastExpr> castExprs = new ArrayList<>();
        expr.collect(CastExpr.class, castExprs);
        return !castExprs.isEmpty();
    }
}