FileSplitter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileCompressType;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class FileSplitter {
    private static final Logger LOG = LogManager.getLogger(FileSplitter.class);

    // If the number of files is larger than parallel instances * num of backends,
    // we don't need to split the file.
    // Otherwise, split the file to avoid local shuffle.
    public static boolean needSplitForCountPushdown(int parallelism, int numBackends, long totalFileNum) {
        return totalFileNum < parallelism * numBackends;
    }

    private long maxInitialSplitSize;

    private long maxSplitSize;

    private int maxInitialSplitNum;
    private final AtomicInteger remainingInitialSplitNum;

    private long currentMaxSplitSize;

    public long getMaxInitialSplitSize() {
        return maxInitialSplitSize;
    }

    public void setMaxInitialSplitSize(long maxInitialSplitSize) {
        this.maxInitialSplitSize = maxInitialSplitSize;
    }

    public long getMaxSplitSize() {
        return maxSplitSize;
    }

    public void setMaxSplitSize(long maxSplitSize) {
        this.maxSplitSize = maxSplitSize;
    }

    public int maxInitialSplitNum() {
        return maxInitialSplitNum;
    }

    public void setMaxInitialSplits(int maxInitialSplitNum) {
        this.maxInitialSplitNum = maxInitialSplitNum;
    }

    public long getRemainingInitialSplitNum() {
        return remainingInitialSplitNum.get();
    }

    public FileSplitter(long maxInitialSplitSize, long maxSplitSize, int maxInitialSplitNum) {
        this.maxInitialSplitSize = maxInitialSplitSize;
        this.maxSplitSize = maxSplitSize;
        this.maxInitialSplitNum = maxInitialSplitNum;
        currentMaxSplitSize = maxInitialSplitSize;
        remainingInitialSplitNum = new AtomicInteger(maxInitialSplitNum);
    }

    public List<Split> splitFile(
                LocationPath path,
                long specifiedFileSplitSize,
                BlockLocation[] blockLocations,
                long length,
                long modificationTime,
                boolean splittable,
                List<String> partitionValues,
                SplitCreator splitCreator)
                throws IOException {
        // Pass splitCreator.create() to set target file split size to calculate split weight.
        long targetFileSplitSize = specifiedFileSplitSize > 0 ? specifiedFileSplitSize : maxSplitSize;
        if (blockLocations == null) {
            blockLocations = new BlockLocation[1];
            blockLocations[0] = new BlockLocation(null, null, 0L, length);
        }
        List<Split> result = Lists.newArrayList();
        TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.getNormalizedLocation());
        if (!splittable || compressType != TFileCompressType.PLAIN) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Path {} is not splittable.", path);
            }
            String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
            result.add(splitCreator.create(path, 0, length, length,
                    targetFileSplitSize,
                    modificationTime, hosts, partitionValues));
            updateCurrentMaxSplitSize();
            return result;
        }

        // if specified split size is not zero, split file by specified size
        if (specifiedFileSplitSize > 0) {
            long bytesRemaining;
            for (bytesRemaining = length; (double) bytesRemaining / (double) specifiedFileSplitSize > 1.1D;
                    bytesRemaining -= specifiedFileSplitSize) {
                int location = getBlockIndex(blockLocations, length - bytesRemaining);
                String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
                result.add(splitCreator.create(path, length - bytesRemaining, specifiedFileSplitSize,
                        length, specifiedFileSplitSize, modificationTime, hosts, partitionValues));
            }
            if (bytesRemaining != 0L) {
                int location = getBlockIndex(blockLocations, length - bytesRemaining);
                String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
                result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
                        length, specifiedFileSplitSize, modificationTime, hosts, partitionValues));
            }
            return result;
        }

        // split file by block
        long start = 0;
        ImmutableList.Builder<InternalBlock> blockBuilder = ImmutableList.builder();
        for (BlockLocation blockLocation : blockLocations) {
            // clamp the block range
            long blockStart = Math.max(start, blockLocation.getOffset());
            long blockEnd = Math.min(start + length, blockLocation.getOffset() + blockLocation.getLength());
            if (blockStart > blockEnd) {
                // block is outside split range
                continue;
            }
            if (blockStart == blockEnd && !(blockStart == start && blockEnd == start + length)) {
                // skip zero-width block, except in the special circumstance:
                // slice is empty, and the block covers the empty slice interval.
                continue;
            }
            blockBuilder.add(new InternalBlock(blockStart, blockEnd, blockLocation.getHosts()));
        }
        List<InternalBlock> blocks = blockBuilder.build();
        if (blocks.isEmpty()) {
            result.add(splitCreator.create(path, 0, length, length,
                    targetFileSplitSize, modificationTime, null,
                    partitionValues));
            updateCurrentMaxSplitSize();
            return result;
        }

        long splitStart = start;
        int currentBlockIdx = 0;
        while (splitStart < start + length) {
            updateCurrentMaxSplitSize();
            long splitBytes;
            long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() - splitStart;
            if (remainingBlockBytes <= currentMaxSplitSize) {
                splitBytes = remainingBlockBytes;
            } else if (currentMaxSplitSize * 2 >= remainingBlockBytes) {
                // Second to last split in this block, generate two evenly sized splits
                splitBytes = remainingBlockBytes / 2;
            } else {
                splitBytes = currentMaxSplitSize;
            }
            result.add(splitCreator.create(path, splitStart, splitBytes,
                    length, targetFileSplitSize, modificationTime, blocks.get(currentBlockIdx).getHosts(),
                    partitionValues));
            splitStart += splitBytes;
            if (splitStart == blocks.get(currentBlockIdx).getEnd()) {
                currentBlockIdx++;
                if (currentBlockIdx != blocks.size()) {
                    Verify.verify(splitStart == blocks.get(currentBlockIdx).getStart());
                }
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Path {} includes {} splits.", path, result.size());
        }
        return result;
    }

    private void updateCurrentMaxSplitSize() {
        currentMaxSplitSize = maxSplitSize;
        int cur = remainingInitialSplitNum.get();
        while (cur > 0) {
            if (remainingInitialSplitNum.compareAndSet(cur, cur - 1)) {
                currentMaxSplitSize = maxInitialSplitSize;
                break;
            }
            cur = remainingInitialSplitNum.get();
        }
    }

    private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
        if (blkLocations == null || blkLocations.length == 0) {
            return -1;
        }
        for (int i = 0; i < blkLocations.length; ++i) {
            if (blkLocations[i].getOffset() <= offset
                    && offset < blkLocations[i].getOffset() + blkLocations[i].getLength()) {
                return i;
            }
        }
        BlockLocation last = blkLocations[blkLocations.length - 1];
        long fileLength = last.getOffset() + last.getLength() - 1L;
        throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
    }

    private static class InternalBlock {
        private final long start;
        private final long end;
        private final String[] hosts;

        public InternalBlock(long start, long end, String[] hosts) {
            Preconditions.checkArgument(start <= end, "block end cannot be before block start");
            this.start = start;
            this.end = end;
            this.hosts = hosts;
        }

        public long getStart() {
            return start;
        }

        public long getEnd() {
            return end;
        }

        public String[] getHosts() {
            return hosts;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            InternalBlock that = (InternalBlock) o;
            return start == that.start && end == that.end && Arrays.equals(hosts, that.hosts);
        }

        @Override
        public int hashCode() {
            int result = Objects.hash(start, end);
            result = 31 * result + Arrays.hashCode(hosts);
            return result;
        }

        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("InternalBlock{");
            sb.append("start=").append(start);
            sb.append(", end=").append(end);
            sb.append(", hosts=").append(Arrays.toString(hosts));
            sb.append('}');
            return sb.toString();
        }
    }


}