FileQueryScanNode.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSplitSource;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.doris.thrift.TTextSerdeType;
import org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc;
import org.apache.doris.thrift.TTransactionalHiveDesc;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * FileQueryScanNode for querying the file access type of catalog, now only support
 * hive, hudi, iceberg and TVF.
 */
public abstract class FileQueryScanNode extends FileScanNode {
    private static final Logger LOG = LogManager.getLogger(FileQueryScanNode.class);

    protected Map<String, SlotDescriptor> destSlotDescByName;
    protected TFileScanRangeParams params;

    @Getter
    protected TableSample tableSample;

    protected String brokerName;

    protected TableSnapshot tableSnapshot;
    // Save the reference of session variable, so that we don't need to get it from connection context.
    // connection context is a thread local variable, it is not available is running in other thread.
    protected SessionVariable sessionVariable;

    /**
     * External file scan node for Query hms table
     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
     * eg: s3 tvf
     * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
     */
    public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
            StatisticalType statisticalType, boolean needCheckColumnPriv,
            SessionVariable sv) {
        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
        this.sessionVariable = sv;
    }

    @Override
    public void init(Analyzer analyzer) throws UserException {
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
        }
        super.init(analyzer);
        doInitialize();
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
        }
    }

    /**
     * Init ExternalFileScanNode, ONLY used for Nereids. Should NOT use this function in anywhere else.
     */
    @Override
    public void init() throws UserException {
        super.init();
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
        }
        doInitialize();
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
        }
    }

    // Init scan provider and schema related params.
    protected void doInitialize() throws UserException {
        Preconditions.checkNotNull(desc);
        if (desc.getTable() instanceof ExternalTable) {
            ExternalTable table = (ExternalTable) desc.getTable();
            if (table.isView()) {
                throw new AnalysisException(
                        String.format("Querying external view '%s.%s' is not supported", table.getDbName(),
                                table.getName()));
            }
        }
        initBackendPolicy();
        initSchemaParams();
    }

    // Init schema (Tuple/Slot) related params.
    protected void initSchemaParams() throws UserException {
        destSlotDescByName = Maps.newHashMap();
        for (SlotDescriptor slot : desc.getSlots()) {
            destSlotDescByName.put(slot.getColumn().getName(), slot);
        }
        params = new TFileScanRangeParams();
        if (this instanceof HiveScanNode) {
            params.setTextSerdeType(TTextSerdeType.HIVE_TEXT_SERDE);
        }
        params.setDestTupleId(desc.getId().asInt());
        List<String> partitionKeys = getPathPartitionKeys();
        List<Column> columns = desc.getTable().getBaseSchema(false);
        params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
        for (SlotDescriptor slot : desc.getSlots()) {
            if (!slot.isMaterialized()) {
                continue;
            }
            TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
            slotInfo.setSlotId(slot.getId().asInt());
            slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
            params.addToRequiredSlots(slotInfo);
        }
        setDefaultValueExprs(getTargetTable(), destSlotDescByName, null, params, false);
        setColumnPositionMapping();
        // For query, set src tuple id to -1.
        params.setSrcTupleId(-1);
    }

    private void updateRequiredSlots() throws UserException {
        params.unsetRequiredSlots();
        for (SlotDescriptor slot : desc.getSlots()) {
            if (!slot.isMaterialized()) {
                continue;
            }

            TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
            slotInfo.setSlotId(slot.getId().asInt());
            slotInfo.setIsFileSlot(!getPathPartitionKeys().contains(slot.getColumn().getName()));
            params.addToRequiredSlots(slotInfo);
        }
        // Update required slots and column_idxs in scanRangeLocations.
        setColumnPositionMapping();
    }

    public void setTableSample(TableSample tSample) {
        this.tableSample = tSample;
    }

    @Override
    public void finalize(Analyzer analyzer) throws UserException {
        doFinalize();
    }

    @Override
    public void finalizeForNereids() throws UserException {
        doFinalize();
    }

    // Create scan range locations and the statistics.
    protected void doFinalize() throws UserException {
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setFinalizeScanNodeStartTime();
        }
        convertPredicate();
        createScanRangeLocations();
        updateRequiredSlots();
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setFinalizeScanNodeFinishTime();
        }
    }

    /**
     * Used as a predicate to convert conjuncts into corresponding data sources.
     * All predicate conversions from different data sources should override this method.
     * and this method must be called in finalize,
     * because in nereids planner, conjuncts are only generated in the finalize stage.
     */
    protected void convertPredicate() {
    }

    private void setColumnPositionMapping()
            throws UserException {
        TableIf tbl = getTargetTable();
        List<Integer> columnIdxs = Lists.newArrayList();
        // avoid null pointer, it maybe has no slots when two tables are joined
        if (params.getRequiredSlots() == null) {
            params.setColumnIdxs(columnIdxs);
            return;
        }
        for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
            if (!slot.isIsFileSlot()) {
                continue;
            }
            SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId());
            String colName = slotDesc.getColumn().getName();
            if (colName.startsWith(Column.GLOBAL_ROWID_COL)) {
                continue;
            }

            int idx = -1;
            List<Column> columns = getColumns();
            for (int i = 0; i < columns.size(); i++) {
                if (columns.get(i).getName().equals(colName)) {
                    idx = i;
                    break;
                }
            }
            if (idx == -1) {
                throw new UserException("Column " + colName + " not found in table " + tbl.getName());
            }
            columnIdxs.add(idx);
        }
        params.setColumnIdxs(columnIdxs);
    }

    public TFileScanRangeParams getFileScanRangeParams() {
        return params;
    }

    // Set some parameters of scan to support different types of file data sources
    protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
    }

    // Serialize the table to be scanned to BE's jni reader
    protected Optional<String> getSerializedTable() {
        return Optional.empty();
    }

    @Override
    public void createScanRangeLocations() throws UserException {
        long start = System.currentTimeMillis();
        StmtExecutor executor = ConnectContext.get().getExecutor();
        if (executor != null) {
            executor.getSummaryProfile().setGetSplitsStartTime();
        }
        TFileFormatType fileFormatType = getFileFormatType();
        if (fileFormatType == TFileFormatType.FORMAT_ORC) {
            genSlotToSchemaIdMapForOrc();
        }
        params.setFormatType(fileFormatType);
        boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
        boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
        if (isCsvOrJson || isWal) {
            params.setFileAttributes(getFileAttributes());
            if (isFileStreamType()) {
                params.setFileType(TFileType.FILE_STREAM);
                FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
                ExternalFileTableValuedFunction tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
                params.setCompressType(tableValuedFunction.getTFileCompressType());

                TScanRangeLocations curLocations = newLocations();
                TFileRangeDesc rangeDesc = new TFileRangeDesc();
                rangeDesc.setLoadId(ConnectContext.get().queryId());
                rangeDesc.setSize(-1);
                rangeDesc.setFileSize(-1);
                curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                curLocations.getScanRange().getExtScanRange().getFileScanRange().setParams(params);

                TScanRangeLocation location = new TScanRangeLocation();
                long backendId = ConnectContext.get().getBackendId();
                Backend backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(backendId);
                location.setBackendId(backendId);
                location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
                curLocations.addToLocations(location);
                scanRangeLocations.add(curLocations);
                scanBackendIds.add(backendId);
                return;
            }
        }

        Map<String, String> locationProperties = getLocationProperties();
        // for JNI, only need to set properties
        if (fileFormatType == TFileFormatType.FORMAT_JNI) {
            params.setProperties(locationProperties);
        }

        int numBackends = backendPolicy.numBackends();
        List<String> pathPartitionKeys = getPathPartitionKeys();
        if (isBatchMode()) {
            // File splits are generated lazily, and fetched by backends while scanning.
            // Only provide the unique ID of split source to backend.
            splitAssignment = new SplitAssignment(
                    backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys);
            splitAssignment.init();
            if (executor != null) {
                executor.getSummaryProfile().setGetSplitsFinishTime();
            }
            if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) {
                return;
            }
            FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
            TFileType locationType = fileSplit.getLocationType();
            selectedSplitNum = numApproximateSplits();
            if (selectedSplitNum < 0) {
                throw new UserException("Approximate split number should not be negative");
            }
            totalFileSize = fileSplit.getLength() * selectedSplitNum;
            long maxWaitTime = sessionVariable.getFetchSplitsMaxWaitTime();
            // Not accurate, only used to estimate concurrency.
            // Here, we must take the max of 1, because
            // in the case of multiple BEs, `numApproximateSplits() / backendPolicy.numBackends()` may be 0,
            // and finally numSplitsPerBE is 0, resulting in no data being queried.
            int numSplitsPerBE = Math.max(selectedSplitNum / backendPolicy.numBackends(), 1);
            for (Backend backend : backendPolicy.getBackends()) {
                SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime);
                splitSources.add(splitSource);
                Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
                TScanRangeLocations curLocations = newLocations();
                TSplitSource tSource = new TSplitSource();
                tSource.setSplitSourceId(splitSource.getUniqueId());
                tSource.setNumSplits(numSplitsPerBE);
                curLocations.getScanRange().getExtScanRange().getFileScanRange().setSplitSource(tSource);
                TScanRangeLocation location = new TScanRangeLocation();
                location.setBackendId(backend.getId());
                location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
                curLocations.addToLocations(location);
                // So there's only one scan range for each backend.
                // Each backend only starts up one ScanNode instance.
                // However, even one ScanNode instance can provide maximum scanning concurrency.
                scanRangeLocations.add(curLocations);
                setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
                scanBackendIds.add(backend.getId());
            }
        } else {
            List<Split> inputSplits = getSplits(numBackends);
            if (ConnectContext.get().getExecutor() != null) {
                ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
            }
            selectedSplitNum = inputSplits.size();
            if (inputSplits.isEmpty() && !isFileStreamType()) {
                return;
            }
            Multimap<Backend, Split> assignment =  backendPolicy.computeScanRangeAssignment(inputSplits);
            for (Backend backend : assignment.keySet()) {
                Collection<Split> splits = assignment.get(backend);
                for (Split split : splits) {
                    scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys));
                    totalFileSize += split.getLength();
                }
                scanBackendIds.add(backend.getId());
            }
        }

        getSerializedTable().ifPresent(params::setSerializedTable);

        if (executor != null) {
            executor.getSummaryProfile().setCreateScanRangeFinishTime();
            if (sessionVariable.showSplitProfileInfo()) {
                executor.getSummaryProfile().setAssignedWeightPerBackend(backendPolicy.getAssignedWeightPerBackend());
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("create #{} ScanRangeLocations cost: {} ms",
                    scanRangeLocations.size(), (System.currentTimeMillis() - start));
        }
    }

    private TScanRangeLocations splitToScanRange(
            Backend backend,
            Map<String, String> locationProperties,
            Split split,
            List<String> pathPartitionKeys) throws UserException {
        FileSplit fileSplit = (FileSplit) split;
        TScanRangeLocations curLocations = newLocations();
        // If fileSplit has partition values, use the values collected from hive partitions.
        // Otherwise, use the values in file path.
        boolean isACID = false;
        if (fileSplit instanceof HiveSplit) {
            HiveSplit hiveSplit = (HiveSplit) fileSplit;
            isACID = hiveSplit.isACID();
        }
        List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
                ? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(), pathPartitionKeys,
                false, isACID) : fileSplit.getPartitionValues();

        TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
        TFileCompressType fileCompressType = getFileCompressType(fileSplit);
        rangeDesc.setCompressType(fileCompressType);
        if (fileSplit instanceof  HiveSplit) {
            if (isACID) {
                HiveSplit hiveSplit = (HiveSplit) fileSplit;
                hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
                TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
                tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
                AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
                TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
                transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
                List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
                for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
                    TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
                    deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
                    deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
                    deleteDeltaDescs.add(deleteDeltaDesc);
                }
                transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
                tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
                rangeDesc.setTableFormatParams(tableFormatFileDesc);
            } else {
                TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
                tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
                rangeDesc.setTableFormatParams(tableFormatFileDesc);
            }
        }

        // set file format type, and the type might fall back to native format in setScanParams
        rangeDesc.setFormatType(getFileFormatType());
        setScanParams(rangeDesc, fileSplit);
        curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
        TScanRangeLocation location = new TScanRangeLocation();
        setLocationPropertiesIfNecessary(backend, fileSplit.getLocationType(), locationProperties);
        location.setBackendId(backend.getId());
        location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
        curLocations.addToLocations(location);
        return curLocations;
    }

    private void setLocationPropertiesIfNecessary(Backend selectedBackend, TFileType locationType,
            Map<String, String> locationProperties) throws UserException {
        if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
            if (!params.isSetHdfsParams()) {
                THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
                // tHdfsParams.setFsName(getFsName(fileSplit));
                params.setHdfsParams(tHdfsParams);
            }

            if (locationType == TFileType.FILE_BROKER) {
                params.setProperties(locationProperties);

                if (!params.isSetBrokerAddresses()) {
                    FsBroker broker;
                    if (brokerName != null) {
                        broker = Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost());
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(String.format(
                                    "Set location for broker [%s], selected BE host: [%s] selected broker host: [%s]",
                                    brokerName, selectedBackend.getHost(), broker.host));
                        }
                    } else {
                        broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
                    }
                    if (broker == null) {
                        throw new UserException("No alive broker.");
                    }
                    params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
                }
            }
        } else if ((locationType == TFileType.FILE_S3 || locationType == TFileType.FILE_LOCAL)
                && !params.isSetProperties()) {
            params.setProperties(locationProperties);
        }

        if (!params.isSetFileType()) {
            params.setFileType(locationType);
        }
    }

    private TScanRangeLocations newLocations() {
        // Generate on file scan range
        TFileScanRange fileScanRange = new TFileScanRange();
        // Scan range
        TExternalScanRange externalScanRange = new TExternalScanRange();
        externalScanRange.setFileScanRange(fileScanRange);
        TScanRange scanRange = new TScanRange();
        scanRange.setExtScanRange(externalScanRange);

        // Locations
        TScanRangeLocations locations = new TScanRangeLocations();
        locations.setScanRange(scanRange);
        return locations;
    }

    private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
                                               List<String> columnsFromPathKeys) {
        TFileRangeDesc rangeDesc = new TFileRangeDesc();
        rangeDesc.setStartOffset(fileSplit.getStart());
        rangeDesc.setSize(fileSplit.getLength());
        // fileSize only be used when format is orc or parquet and TFileType is broker
        // When TFileType is other type, it is not necessary
        rangeDesc.setFileSize(fileSplit.getFileLength());
        rangeDesc.setColumnsFromPath(columnsFromPath);
        rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);

        rangeDesc.setFileType(fileSplit.getLocationType());
        rangeDesc.setPath(fileSplit.getPath().toStorageLocation().toString());
        if (fileSplit.getLocationType() == TFileType.FILE_HDFS) {
            URI fileUri = fileSplit.getPath().getPath().toUri();
            rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
        }
        rangeDesc.setModificationTime(fileSplit.getModificationTime());
        return rangeDesc;
    }

    // To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
    // We need to save mapping from slot name to schema position
    protected void genSlotToSchemaIdMapForOrc() {
        Preconditions.checkNotNull(params);
        List<Column> baseSchema = desc.getTable().getBaseSchema();
        Map<String, Integer> columnNameToPosition = Maps.newHashMap();
        for (SlotDescriptor slot : desc.getSlots()) {
            int idx = 0;
            for (Column col : baseSchema) {
                if (col.getName().equals(slot.getColumn().getName())) {
                    columnNameToPosition.put(col.getName(), idx);
                    break;
                }
                idx += 1;
            }
        }
        params.setSlotNameToSchemaPos(columnNameToPosition);
    }

    @Override
    public int getScanRangeNum() {
        Preconditions.checkNotNull(scanRangeLocations);
        int i = 0;
        for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
            TScanRange tScanRange = tScanRangeLocations.getScanRange();
            TFileScanRange tFileScanRange = tScanRange.getExtScanRange().getFileScanRange();
            i += tFileScanRange.getRangesSize();
        }
        return i;
    }

    @Override
    public int getNumInstances() {
        if (sessionVariable.isIgnoreStorageDataDistribution()) {
            return sessionVariable.getParallelExecInstanceNum();
        }
        return scanRangeLocations.size();
    }

    // Return true if this is a TFileType.FILE_STREAM type.
    // Currently, only TVFScanNode may be TFileType.FILE_STREAM type.
    protected boolean isFileStreamType() throws UserException {
        return false;
    }

    protected abstract TFileFormatType getFileFormatType() throws UserException;

    protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
        return Util.inferFileCompressTypeByPath(fileSplit.getPathString());
    }

    protected TFileAttributes getFileAttributes() throws UserException {
        throw new NotImplementedException("getFileAttributes is not implemented.");
    }

    protected abstract List<String> getPathPartitionKeys() throws UserException;

    protected abstract TableIf getTargetTable() throws UserException;

    // TODO: Rename this method when Metadata Service (MS) integration is complete.
    // The current name "getLocationProperties" is a placeholder and may not reflect
    // the new structure of storage parameters expected from MS.
    protected abstract Map<String, String> getLocationProperties() throws UserException;

    @Override
    public void stop() {
        if (splitAssignment != null) {
            splitAssignment.stop();
            SplitSourceManager manager = Env.getCurrentEnv().getSplitSourceManager();
            for (Long sourceId : splitAssignment.getSources()) {
                manager.removeSplitSource(sourceId);
            }
        }
    }

    public void setQueryTableSnapshot(TableSnapshot tableSnapshot) {
        this.tableSnapshot = tableSnapshot;
    }

    public TableSnapshot getQueryTableSnapshot() {
        TableSnapshot snapshot = desc.getRef().getTableSnapshot();
        if (snapshot != null) {
            return snapshot;
        }
        return this.tableSnapshot;
    }

    /**
     * The real file split size is determined by:
     * 1. If user specify the split size in session variable `file_split_size`, use user specified value.
     * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
     * @param blockSize, got from file system, eg, hdfs
     * @return the real file split size
     */
    protected long getRealFileSplitSize(long blockSize) {
        long realSplitSize = sessionVariable.getFileSplitSize();
        if (realSplitSize <= 0) {
            realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
        }
        return realSplitSize;
    }
}