FileSplitter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileCompressType;

import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.List;

public class FileSplitter {
    private static final Logger LOG = LogManager.getLogger(FileSplitter.class);

    // If the number of files is larger than parallel instances * num of backends,
    // we don't need to split the file.
    // Otherwise, split the file to avoid local shuffle.
    public static boolean needSplitForCountPushdown(int parallelism, int numBackends, long totalFileNum) {
        return totalFileNum < parallelism * numBackends;
    }

    public static List<Split> splitFile(
            LocationPath path,
            long fileSplitSize,
            BlockLocation[] blockLocations,
            long length,
            long modificationTime,
            boolean splittable,
            List<String> partitionValues,
            SplitCreator splitCreator)
            throws IOException {
        if (blockLocations == null) {
            blockLocations = new BlockLocation[0];
        }
        List<Split> result = Lists.newArrayList();
        TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
        if (!splittable || compressType != TFileCompressType.PLAIN) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Path {} is not splittable.", path);
            }
            String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
            result.add(splitCreator.create(path, 0, length, length, fileSplitSize,
                    modificationTime, hosts, partitionValues));
            return result;
        }
        long bytesRemaining;
        for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D;
                bytesRemaining -= fileSplitSize) {
            int location = getBlockIndex(blockLocations, length - bytesRemaining);
            String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
            result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize,
                    length, fileSplitSize, modificationTime, hosts, partitionValues));
        }
        if (bytesRemaining != 0L) {
            int location = getBlockIndex(blockLocations, length - bytesRemaining);
            String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
            result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
                    length, fileSplitSize, modificationTime, hosts, partitionValues));
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Path {} includes {} splits.", path, result.size());
        }
        return result;
    }

    private static int getBlockIndex(BlockLocation[] blkLocations, long offset) {
        if (blkLocations == null || blkLocations.length == 0) {
            return -1;
        }
        for (int i = 0; i < blkLocations.length; ++i) {
            if (blkLocations[i].getOffset() <= offset
                    && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
                return i;
            }
        }
        BlockLocation last = blkLocations[blkLocations.length - 1];
        long fileLength = last.getOffset() + last.getLength() - 1L;
        throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
    }


}