MemoryFileSystem.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.fs;

import org.apache.doris.fs.io.DorisInput;
import org.apache.doris.fs.io.DorisInputFile;
import org.apache.doris.fs.io.DorisInputStream;
import org.apache.doris.fs.io.DorisOutputFile;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * In-memory {@link FileSystem} implementation for unit testing.
 * <p>
 * File data is stored in a {@link ConcurrentHashMap}. Directories are implicit
 * (any Location whose path ends with "/" is treated as a directory).
 * Thread-safe for concurrent read/write operations.
 */
public class MemoryFileSystem implements FileSystem {

    // Maps location string → file bytes (null entry = directory marker)
    private final ConcurrentHashMap<String, byte[]> store = new ConcurrentHashMap<>();

    @Override
    public DorisInputFile newInputFile(Location location) {
        return newInputFile(location, -1L);
    }

    @Override
    public DorisInputFile newInputFile(Location location, long length) {
        return new MemoryInputFile(location, length);
    }

    @Override
    public DorisOutputFile newOutputFile(Location location) {
        return new MemoryOutputFile(location);
    }

    @Override
    public boolean exists(Location location) {
        String key = location.toString();
        if (store.containsKey(key)) {
            return true;
        }
        String prefix = key.endsWith("/") ? key : key + "/";
        return store.keySet().stream().anyMatch(k -> k.startsWith(prefix));
    }

    @Override
    public void deleteFile(Location location) throws IOException {
        String key = location.toString();
        if (store.remove(key) == null) {
            throw new IOException("File not found: " + location);
        }
    }

    @Override
    public void renameFile(Location source, Location target) throws IOException {
        byte[] data = store.remove(source.toString());
        if (data == null) {
            throw new IOException("Source not found: " + source);
        }
        store.put(target.toString(), data);
    }

    @Override
    public void deleteDirectory(Location location) throws IOException {
        String prefix = location.toString();
        String withSlash = prefix.endsWith("/") ? prefix : prefix + "/";
        store.keySet().removeIf(k -> k.equals(prefix) || k.startsWith(withSlash));
    }

    @Override
    public void createDirectory(Location location) {
        store.putIfAbsent(location.toString() + "/", new byte[0]);
    }

    @Override
    public void renameDirectory(Location source, Location target) throws IOException {
        String srcPrefix = source.toString();
        String dstPrefix = target.toString();
        Map<String, byte[]> toMove = store.entrySet().stream()
                .filter(e -> e.getKey().startsWith(srcPrefix))
                .collect(Collectors.toMap(
                        e -> dstPrefix + e.getKey().substring(srcPrefix.length()),
                        Map.Entry::getValue));
        if (toMove.isEmpty()) {
            throw new IOException("Source directory not found: " + source);
        }
        store.keySet().removeIf(k -> k.startsWith(srcPrefix));
        store.putAll(toMove);
    }

    @Override
    public FileIterator listFiles(Location location, boolean recursive) {
        String prefix = location.toString();
        String withSlash = prefix.endsWith("/") ? prefix : prefix + "/";
        List<FileEntry> entries = store.keySet().stream()
                .filter(k -> k.startsWith(withSlash) && !k.equals(withSlash))
                .filter(k -> {
                    if (recursive) {
                        return true;
                    }
                    String relative = k.substring(withSlash.length());
                    return !relative.contains("/") || relative.endsWith("/");
                })
                .map(k -> {
                    Location loc = Location.of(k);
                    byte[] data = store.get(k);
                    return FileEntry.builder(loc)
                            .directory(k.endsWith("/"))
                            .length(data == null ? 0 : data.length)
                            .build();
                })
                .collect(Collectors.toList());
        return FileIterator.ofList(entries);
    }

    @Override
    public Set<Location> listDirectories(Location location) {
        String prefix = location.toString();
        String withSlash = prefix.endsWith("/") ? prefix : prefix + "/";
        Set<Location> dirs = new HashSet<>();
        for (String key : store.keySet()) {
            if (!key.startsWith(withSlash)) {
                continue;
            }
            String relative = key.substring(withSlash.length());
            int slash = relative.indexOf('/');
            if (slash >= 0) {
                dirs.add(Location.of(withSlash + relative.substring(0, slash + 1)));
            }
        }
        return Collections.unmodifiableSet(dirs);
    }

    @Override
    public void close() {
        // No-op: in-memory, nothing to release
    }

    /** Writes data directly into this filesystem (test helper). */
    public void put(Location location, byte[] data) {
        store.put(location.toString(), data);
    }

    /** Reads raw bytes from this filesystem (test helper). */
    public byte[] get(Location location) {
        return store.get(location.toString());
    }

    private class MemoryInputFile implements DorisInputFile {
        private final Location location;
        private final long knownLength;

        MemoryInputFile(Location location, long knownLength) {
            this.location = location;
            this.knownLength = knownLength;
        }

        @Override
        public Location location() {
            return location;
        }

        @Override
        public long length() throws IOException {
            if (knownLength >= 0) {
                return knownLength;
            }
            byte[] data = store.get(location.toString());
            if (data == null) {
                throw new IOException("File not found: " + location);
            }
            return data.length;
        }

        @Override
        public long lastModifiedTime() {
            return 0L;
        }

        @Override
        public boolean exists() {
            return store.containsKey(location.toString());
        }

        @Override
        public DorisInput newInput() throws IOException {
            throw new UnsupportedOperationException("Use newStream() for MemoryFileSystem");
        }

        @Override
        public DorisInputStream newStream() throws IOException {
            byte[] data = store.get(location.toString());
            if (data == null) {
                throw new IOException("File not found: " + location);
            }
            return new MemorySeekableInputStream(data);
        }
    }

    /** In-memory seekable stream over a fixed byte array. For testing only. */
    private static class MemorySeekableInputStream extends DorisInputStream {
        private final byte[] data;
        private int position;
        private boolean closed;

        MemorySeekableInputStream(byte[] data) {
            this.data = data;
        }

        @Override
        public long getPosition() throws IOException {
            return position;
        }

        @Override
        public void seek(long pos) throws IOException {
            if (pos < 0 || pos > data.length) {
                throw new IOException("Seek out of range [0, " + data.length + "]: " + pos);
            }
            position = (int) pos;
        }

        @Override
        public int read() throws IOException {
            if (closed) {
                throw new IOException("Stream closed");
            }
            if (position >= data.length) {
                return -1;
            }
            return data[position++] & 0xFF;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            if (closed) {
                throw new IOException("Stream closed");
            }
            if (position >= data.length) {
                return -1;
            }
            int n = Math.min(len, data.length - position);
            System.arraycopy(data, position, b, off, n);
            position += n;
            return n;
        }

        @Override
        public void close() {
            closed = true;
        }
    }

    private class MemoryOutputFile implements DorisOutputFile {
        private final Location location;

        MemoryOutputFile(Location location) {
            this.location = location;
        }

        @Override
        public Location location() {
            return location;
        }

        @Override
        public OutputStream create() throws IOException {
            if (store.containsKey(location.toString())) {
                throw new IOException("File already exists: " + location);
            }
            return buildStream();
        }

        @Override
        public OutputStream createOrOverwrite() {
            return buildStream();
        }

        private OutputStream buildStream() {
            return new ByteArrayOutputStream() {
                @Override
                public void close() throws IOException {
                    super.close();
                    store.put(location.toString(), toByteArray());
                }
            };
        }
    }
}