HiveScanNode.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hive.source;

import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveProperties;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fsv2.DirectoryLister;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TPushAggOp;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Setter;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class HiveScanNode extends FileQueryScanNode {
    private static final Logger LOG = LogManager.getLogger(HiveScanNode.class);

    protected final HMSExternalTable hmsTable;
    private HiveTransaction hiveTransaction = null;

    // will only be set in Nereids, for lagency planner, it should be null
    @Setter
    protected SelectedPartitions selectedPartitions = null;

    private DirectoryLister directoryLister;

    private boolean partitionInit = false;
    private final AtomicReference<UserException> batchException = new AtomicReference<>(null);
    private List<HivePartition> prunedPartitions;
    private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT);
    private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION);

    private boolean skipCheckingAcidVersionFile = false;

    /**
     * * External file scan node for Query Hive table
     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
     * eg: s3 tvf
     * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
     */
    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv,
            DirectoryLister directoryLister) {
        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv, directoryLister);
    }

    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
            StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv,
            DirectoryLister directoryLister) {
        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv);
        hmsTable = (HMSExternalTable) desc.getTable();
        brokerName = hmsTable.getCatalog().bindBrokerName();
        this.directoryLister = directoryLister;
    }

    @Override
    protected void doInitialize() throws UserException {
        super.doInitialize();

        if (hmsTable.isHiveTransactionalTable()) {
            this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
                    ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable());
            Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
            skipCheckingAcidVersionFile = sessionVariable.skipCheckingAcidVersionFile;
        }
    }

    protected List<HivePartition> getPartitions() throws AnalysisException {
        List<HivePartition> resPartitions = Lists.newArrayList();
        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
        List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable));
        if (!partitionColumnTypes.isEmpty()) {
            // partitioned table
            Collection<PartitionItem> partitionItems;
            // partitions has benn pruned by Nereids, in PruneFileScanPartition,
            // so just use the selected partitions.
            this.totalPartitionNum = selectedPartitions.totalPartitionNum;
            partitionItems = selectedPartitions.selectedPartitions.values();
            Preconditions.checkNotNull(partitionItems);
            this.selectedPartitionNum = partitionItems.size();

            // get partitions from cache
            List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(partitionItems.size());
            for (PartitionItem item : partitionItems) {
                partitionValuesList.add(
                        ((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
            }
            resPartitions = cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(),
                    partitionValuesList);
        } else {
            // non partitioned table, create a dummy partition to save location and inputformat,
            // so that we can unify the interface.
            HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
                    hmsTable.getRemoteTable().getSd().getInputFormat(),
                    hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
            this.totalPartitionNum = 1;
            this.selectedPartitionNum = 1;
            resPartitions.add(dummyPartition);
        }
        if (ConnectContext.get().getExecutor() != null) {
            ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionsFinishTime();
        }
        return resPartitions;
    }

    @Override
    public List<Split> getSplits(int numBackends) throws UserException {
        long start = System.currentTimeMillis();
        try {
            if (!partitionInit) {
                prunedPartitions = getPartitions();
                partitionInit = true;
            }
            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                    .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
            String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
            List<Split> allFiles = Lists.newArrayList();
            getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName, numBackends);
            if (ConnectContext.get().getExecutor() != null) {
                ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
                        allFiles.size(), hmsTable.getDbName(), hmsTable.getName(),
                        (System.currentTimeMillis() - start));
            }
            return allFiles;
        } catch (Throwable t) {
            LOG.warn("get file split failed for table: {}", hmsTable.getName(), t);
            throw new UserException(
                "get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t),
                t);
        }
    }

    @Override
    public void startSplit(int numBackends) {
        if (prunedPartitions.isEmpty()) {
            splitAssignment.finishSchedule();
            return;
        }
        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
        Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
        String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
        AtomicInteger numFinishedPartitions = new AtomicInteger(0);
        CompletableFuture.runAsync(() -> {
            for (HivePartition partition : prunedPartitions) {
                if (batchException.get() != null || splitAssignment.isStop()) {
                    break;
                }
                try {
                    splittersOnFlight.acquire();
                    CompletableFuture.runAsync(() -> {
                        try {
                            List<Split> allFiles = Lists.newArrayList();
                            getFileSplitByPartitions(
                                    cache, Collections.singletonList(partition), allFiles, bindBrokerName, numBackends);
                            if (allFiles.size() > numSplitsPerPartition.get()) {
                                numSplitsPerPartition.set(allFiles.size());
                            }
                            if (splitAssignment.needMoreSplit()) {
                                splitAssignment.addToQueue(allFiles);
                            }
                        } catch (Exception e) {
                            batchException.set(new UserException(e.getMessage(), e));
                        } finally {
                            splittersOnFlight.release();
                            if (batchException.get() != null) {
                                splitAssignment.setException(batchException.get());
                            }
                            if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
                                splitAssignment.finishSchedule();
                            }
                        }
                    }, scheduleExecutor);
                } catch (Exception e) {
                    // When submitting a task, an exception will be thrown if the task pool(scheduleExecutor) is full
                    batchException.set(new UserException(e.getMessage(), e));
                    break;
                }
            }
            if (batchException.get() != null) {
                splitAssignment.setException(batchException.get());
            }
        });
    }

    @Override
    public boolean isBatchMode() {
        if (!partitionInit) {
            try {
                prunedPartitions = getPartitions();
            } catch (Exception e) {
                return false;
            }
            partitionInit = true;
        }
        int numPartitions = sessionVariable.getNumPartitionsInBatchMode();
        return numPartitions >= 0 && prunedPartitions.size() >= numPartitions;
    }

    @Override
    public int numApproximateSplits() {
        return numSplitsPerPartition.get() * prunedPartitions.size();
    }

    private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
            List<Split> allFiles, String bindBrokerName, int numBackends) throws IOException, UserException {
        List<FileCacheValue> fileCaches;
        if (hiveTransaction != null) {
            try {
                fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
            } catch (Exception e) {
                // Release shared load (getValidWriteIds acquire Lock).
                // If no exception is throw, the lock will be released when `finalizeQuery()`.
                Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());
                throw e;
            }
        } else {
            boolean withCache = Config.max_external_file_cache_num > 0;
            fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName,
                    directoryLister, hmsTable);
        }
        if (tableSample != null) {
            List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches);
            splitAllFiles(allFiles, hiveFileStatuses);
            return;
        }

        /**
         * If the push down aggregation operator is COUNT,
         * we don't need to split the file because for parquet/orc format, only metadata is read.
         * If we split the file, we will read metadata of a file multiple times, which is not efficient.
         *
         * - Hive Full Acid Transactional Table may need merge on read, so do not apply this optimization.
         * - If the file format is not parquet/orc, eg, text, we need to split the file to increase the parallelism.
         */
        boolean needSplit = true;
        if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT
                && !(hmsTable.isHiveTransactionalTable() && hmsTable.isFullAcidTable())) {
            int totalFileNum = 0;
            for (FileCacheValue fileCacheValue : fileCaches) {
                if (fileCacheValue.getFiles() != null) {
                    totalFileNum += fileCacheValue.getFiles().size();
                }
            }
            int parallelNum = sessionVariable.getParallelExecInstanceNum();
            needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum);
        }
        for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
            if (fileCacheValue.getFiles() != null) {
                boolean isSplittable = fileCacheValue.isSplittable();
                for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
                    allFiles.addAll(FileSplitter.splitFile(status.getPath(),
                            // set block size to Long.MAX_VALUE to avoid splitting the file.
                            getRealFileSplitSize(needSplit ? status.getBlockSize() : Long.MAX_VALUE),
                            status.getBlockLocations(), status.getLength(), status.getModificationTime(),
                            isSplittable, fileCacheValue.getPartitionValues(),
                            new HiveSplitCreator(fileCacheValue.getAcidInfo())));
                }
            }
        }
    }

    private void splitAllFiles(List<Split> allFiles,
                               List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses) throws IOException {
        for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
            allFiles.addAll(FileSplitter.splitFile(status.getPath(), getRealFileSplitSize(status.getBlockSize()),
                    status.getBlockLocations(), status.getLength(), status.getModificationTime(),
                    status.isSplittable(), status.getPartitionValues(),
                    new HiveSplitCreator(status.getAcidInfo())));
        }
    }

    private List<HiveMetaStoreCache.HiveFileStatus> selectFiles(List<FileCacheValue> inputCacheValue) {
        List<HiveMetaStoreCache.HiveFileStatus> fileList = Lists.newArrayList();
        long totalSize = 0;
        for (FileCacheValue value : inputCacheValue) {
            for (HiveMetaStoreCache.HiveFileStatus file : value.getFiles()) {
                file.setSplittable(value.isSplittable());
                file.setPartitionValues(value.getPartitionValues());
                file.setAcidInfo(value.getAcidInfo());
                fileList.add(file);
                totalSize += file.getLength();
            }
        }
        long sampleSize = 0;
        if (tableSample.isPercent()) {
            sampleSize = totalSize * tableSample.getSampleValue() / 100;
        } else {
            long estimatedRowSize = 0;
            for (Column column : hmsTable.getFullSchema()) {
                estimatedRowSize += column.getDataType().getSlotSize();
            }
            sampleSize = estimatedRowSize * tableSample.getSampleValue();
        }
        long selectedSize = 0;
        Collections.shuffle(fileList, new Random(tableSample.getSeek()));
        int index = 0;
        for (HiveMetaStoreCache.HiveFileStatus file : fileList) {
            selectedSize += file.getLength();
            index += 1;
            if (selectedSize >= sampleSize) {
                break;
            }
        }
        return fileList.subList(0, index);
    }

    private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions,
                                                           String bindBrokerName) {
        for (HivePartition partition : partitions) {
            if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) {
                // this is unpartitioned table.
                continue;
            }
            hiveTransaction.addPartition(partition.getPartitionName(hmsTable.getPartitionColumns()));
        }
        Map<String, String> txnValidIds = hiveTransaction.getValidWriteIds(
                ((HMSExternalCatalog) hmsTable.getCatalog()).getClient());

        return cache.getFilesByTransaction(partitions, txnValidIds, hiveTransaction.isFullAcid(), bindBrokerName);
    }

    @Override
    public List<String> getPathPartitionKeys() {
        return hmsTable.getRemoteTable().getPartitionKeys().stream()
                .map(FieldSchema::getName).filter(partitionKey -> !"".equals(partitionKey))
                .map(String::toLowerCase).collect(Collectors.toList());
    }

    @Override
    public TableIf getTargetTable() {
        return hmsTable;
    }

    @Override
    public TFileFormatType getFileFormatType() throws UserException {
        TFileFormatType type = null;
        Table table = hmsTable.getRemoteTable();
        String inputFormatName = table.getSd().getInputFormat();
        String hiveFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
        if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) {
            type = TFileFormatType.FORMAT_PARQUET;
        } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
            type = TFileFormatType.FORMAT_ORC;
        } else if (hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc())) {
            String serDeLib = table.getSd().getSerdeInfo().getSerializationLib();
            if (serDeLib.equals(HiveMetaStoreClientHelper.HIVE_JSON_SERDE)
                    || serDeLib.equals(HiveMetaStoreClientHelper.LEGACY_HIVE_JSON_SERDE)) {
                type = TFileFormatType.FORMAT_JSON;
            } else if (serDeLib.equals(HiveMetaStoreClientHelper.OPENX_JSON_SERDE)) {
                if (!sessionVariable.isReadHiveJsonInOneColumn()) {
                    type = TFileFormatType.FORMAT_JSON;
                } else if (sessionVariable.isReadHiveJsonInOneColumn()
                        && hmsTable.firstColumnIsString()) {
                    type = TFileFormatType.FORMAT_CSV_PLAIN;
                } else {
                    throw new UserException("You set read_hive_json_in_one_column = true, but the first column of "
                            + "table " + hmsTable.getName()
                            + " is not a string column.");
                }
            } else {
                type = TFileFormatType.FORMAT_CSV_PLAIN;
            }
        }
        return type;
    }

    @Override
    protected Map<String, String> getLocationProperties() throws UserException  {
        return hmsTable.getHadoopProperties();
    }

    @Override
    protected TFileAttributes getFileAttributes() throws UserException {
        TFileAttributes fileAttributes = new TFileAttributes();
        Table table = hmsTable.getRemoteTable();
        // set skip header count
        // TODO: support skip footer count
        fileAttributes.setSkipLines(HiveProperties.getSkipHeaderCount(table));
        // TODO: separate hive text table and OpenCsv table
        String serDeLib = table.getSd().getSerdeInfo().getSerializationLib();
        if (serDeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
            TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
            // set properties of LazySimpleSerDe
            // 1. set column separator
            textParams.setColumnSeparator(HiveProperties.getFieldDelimiter(table));
            // 2. set line delimiter
            textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table));
            // 3. set mapkv delimiter
            textParams.setMapkvDelimiter(HiveProperties.getMapKvDelimiter(table));
            // 4. set collection delimiter
            textParams.setCollectionDelimiter(HiveProperties.getCollectionDelimiter(table));
            // 5. set escape delimiter
            HiveProperties.getEscapeDelimiter(table).ifPresent(d -> textParams.setEscape(d.getBytes()[0]));
            // 6. set null format
            textParams.setNullFormat(HiveProperties.getNullFormat(table));
            fileAttributes.setTextParams(textParams);
            fileAttributes.setHeaderType("");
            fileAttributes.setEnableTextValidateUtf8(
                    sessionVariable.enableTextValidateUtf8);
        } else if (serDeLib.equals("org.apache.hadoop.hive.serde2.OpenCSVSerde")) {
            TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
            // set set properties of OpenCSVSerde
            // 1. set column separator
            textParams.setColumnSeparator(HiveProperties.getSeparatorChar(table));
            // 2. set line delimiter
            textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table));
            // 3. set enclose char
            textParams.setEnclose(HiveProperties.getQuoteChar(table).getBytes()[0]);
            // 4. set escape char
            textParams.setEscape(HiveProperties.getEscapeChar(table).getBytes()[0]);
            fileAttributes.setTextParams(textParams);
            fileAttributes.setHeaderType("");
            if (textParams.isSetEnclose()) {
                fileAttributes.setTrimDoubleQuotes(true);
            }
            fileAttributes.setEnableTextValidateUtf8(
                    sessionVariable.enableTextValidateUtf8);
        } else if (serDeLib.equals("org.apache.hive.hcatalog.data.JsonSerDe")) {
            TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
            textParams.setColumnSeparator("\t");
            textParams.setLineDelimiter("\n");
            fileAttributes.setTextParams(textParams);

            fileAttributes.setJsonpaths("");
            fileAttributes.setJsonRoot("");
            fileAttributes.setNumAsString(true);
            fileAttributes.setFuzzyParse(false);
            fileAttributes.setReadJsonByLine(true);
            fileAttributes.setStripOuterArray(false);
            fileAttributes.setHeaderType("");
        } else if (serDeLib.equals("org.openx.data.jsonserde.JsonSerDe")) {
            if (!sessionVariable.isReadHiveJsonInOneColumn()) {
                TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
                textParams.setColumnSeparator("\t");
                textParams.setLineDelimiter("\n");
                fileAttributes.setTextParams(textParams);

                fileAttributes.setJsonpaths("");
                fileAttributes.setJsonRoot("");
                fileAttributes.setNumAsString(true);
                fileAttributes.setFuzzyParse(false);
                fileAttributes.setReadJsonByLine(true);
                fileAttributes.setStripOuterArray(false);
                fileAttributes.setHeaderType("");

                fileAttributes.setOpenxJsonIgnoreMalformed(
                        Boolean.parseBoolean(HiveProperties.getOpenxJsonIgnoreMalformed(table)));
            } else if (sessionVariable.isReadHiveJsonInOneColumn()
                    && hmsTable.firstColumnIsString()) {
                TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
                textParams.setLineDelimiter("\n");
                textParams.setColumnSeparator("\n");
                //First, perform row splitting according to `\n`. When performing column splitting,
                // since there is no `\n`, only one column of data will be generated.
                fileAttributes.setTextParams(textParams);
                fileAttributes.setHeaderType("");
            } else {
                throw new UserException("You set read_hive_json_in_one_column = true, but the first column of table "
                        + hmsTable.getName()
                        + " is not a string column.");
            }
        } else {
            throw new UserException(
                    "unsupported hive table serde: " + serDeLib);
        }

        return fileAttributes;
    }

    @Override
    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {

        String aggFunctionName = aggExpr.getFnName().getFunction();
        return aggFunctionName.equalsIgnoreCase("COUNT");
    }

    @Override
    public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) {
        return !col.isAllowNull();
    }

    @Override
    protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
        TFileCompressType compressType = super.getFileCompressType(fileSplit);
        // hadoop use lz4 blocked codec
        if (compressType == TFileCompressType.LZ4FRAME) {
            compressType = TFileCompressType.LZ4BLOCK;
        }
        return compressType;
    }
}