DelegateOutputFile.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.fileio;

import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.io.DorisOutputFile;
import org.apache.doris.fs.io.ParsedPath;

import com.google.common.io.CountingOutputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.Objects;

/**
 * DelegateOutputFile is an implementation of the Iceberg OutputFile interface.
 * It wraps a DorisOutputFile and delegates file output operations to it, providing
 * integration between Doris file system and Iceberg's file IO abstraction.
 */
public class DelegateOutputFile implements OutputFile {
    /**
     * The underlying Doris file system used for file operations.
     */
    private final FileSystem fileSystem;
    /**
     * The DorisOutputFile instance representing the output file.
     */
    private final DorisOutputFile outputFile;

    /**
     * Constructs a DelegateOutputFile with the specified FileSystem and DorisPath.
     *
     * @param fileSystem the Doris file system to delegate operations to
     * @param path the DorisPath representing the file location
     */
    public DelegateOutputFile(FileSystem fileSystem, ParsedPath path) {
        this.fileSystem = Objects.requireNonNull(fileSystem, "fileSystem is null");
        this.outputFile = fileSystem.newOutputFile(path);
    }

    // ===================== File Creation Methods =====================

    /**
     * Creates a new file for writing. Throws UncheckedIOException if creation fails.
     *
     * @return a PositionOutputStream for writing to the file
     */
    @Override
    public PositionOutputStream create() {
        try {
            return new CountingPositionOutputStream(outputFile.create());
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to create file: " + location(), e);
        }
    }

    /**
     * Creates or overwrites a file for writing. Throws UncheckedIOException if creation fails.
     *
     * @return a PositionOutputStream for writing to the file
     */
    @Override
    public PositionOutputStream createOrOverwrite() {
        try {
            return new CountingPositionOutputStream(outputFile.createOrOverwrite());
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to create file: " + location(), e);
        }
    }

    // ===================== File Information Methods =====================

    /**
     * Returns the location (path) of the file as a string.
     *
     * @return the file location
     */
    @Override
    public String location() {
        return outputFile.path().toString();
    }

    /**
     * Converts this output file to an InputFile for reading.
     *
     * @return an InputFile instance for the same file
     */
    @Override
    public InputFile toInputFile() {
        return new DelegateInputFile(fileSystem.newInputFile(outputFile.path()));
    }

    // ===================== Object Methods =====================

    /**
     * Returns a string representation of this DelegateOutputFile.
     *
     * @return string representation
     */
    @Override
    public String toString() {
        return outputFile.toString();
    }

    /**
     * CountingPositionOutputStream is a wrapper around OutputStream that tracks the number of bytes written.
     * It extends PositionOutputStream to provide position tracking for Iceberg.
     */
    private static class CountingPositionOutputStream extends PositionOutputStream {
        /**
         * The underlying CountingOutputStream that wraps the actual OutputStream.
         */
        private final CountingOutputStream stream;

        /**
         * Constructs a CountingPositionOutputStream with the specified OutputStream.
         *
         * @param stream the OutputStream to wrap
         */
        private CountingPositionOutputStream(OutputStream stream) {
            this.stream = new CountingOutputStream(stream);
        }

        /**
         * Returns the current position (number of bytes written).
         *
         * @return the number of bytes written
         */
        @Override
        public long getPos() {
            return stream.getCount();
        }

        /**
         * Writes a single byte to the output stream.
         *
         * @param b the byte to write
         * @throws IOException if an I/O error occurs
         */
        @Override
        public void write(int b) throws IOException {
            stream.write(b);
        }

        /**
         * Writes a portion of a byte array to the output stream.
         *
         * @param b the byte array
         * @param off the start offset
         * @param len the number of bytes to write
         * @throws IOException if an I/O error occurs
         */
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            stream.write(b, off, len);
        }

        /**
         * Flushes the output stream.
         *
         * @throws IOException if an I/O error occurs
         */
        @Override
        public void flush() throws IOException {
            stream.flush();
        }

        /**
         * Closes the output stream.
         *
         * @throws IOException if an I/O error occurs
         */
        @Override
        public void close() throws IOException {
            stream.close();
        }
    }
}