DelegateFileIO.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg.fileio;
import org.apache.doris.backup.Status;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.io.ParsedPath;
import com.google.common.collect.Iterables;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* DelegateFileIO is an implementation of the Iceberg SupportsBulkOperations interface.
* It delegates file operations (such as input, output, and deletion) to the underlying Doris FileSystem.
* This class is responsible for bridging Doris file system operations with Iceberg's file IO abstraction.
*/
public class DelegateFileIO implements SupportsBulkOperations {
private static final int DELETE_BATCH_SIZE = 1000;
/**
* Properties used to initialize the file system.
*/
private Map<String, String> properties;
/**
* The underlying Doris file system used for file operations.
*/
private FileSystem fileSystem;
/**
* Default constructor.
*/
public DelegateFileIO() {
}
/**
* Constructor with a specified FileSystem.
*
* @param fileSystem the Doris file system to delegate operations to
*/
public DelegateFileIO(FileSystem fileSystem) {
this.fileSystem = Objects.requireNonNull(fileSystem, "fileSystem is null");
}
// ===================== File Creation Methods =====================
/**
* Creates a new InputFile for the given path.
*
* @param path the file path
* @return an InputFile instance
*/
@Override
public InputFile newInputFile(String path) {
return new DelegateInputFile(fileSystem.newInputFile(new ParsedPath(path)));
}
/**
* Creates a new InputFile for the given path and length.
*
* @param path the file path
* @param length the file length
* @return an InputFile instance
*/
@Override
public InputFile newInputFile(String path, long length) {
return new DelegateInputFile(fileSystem.newInputFile(new ParsedPath(path), length));
}
/**
* Creates a new OutputFile for the given path.
*
* @param path the file path
* @return an OutputFile instance
*/
@Override
public OutputFile newOutputFile(String path) {
return new DelegateOutputFile(fileSystem, new ParsedPath(path));
}
// ===================== File Deletion Methods =====================
/**
* Deletes a file at the specified path.
* Throws UncheckedIOException if deletion fails.
*
* @param path the file path to delete
*/
@Override
public void deleteFile(String path) {
Status status = fileSystem.delete(path);
if (!status.ok()) {
throw new UncheckedIOException(
new IOException("Failed to delete file: " + path + ", " + status.toString()));
}
}
/**
* Deletes a file represented by an InputFile.
* Delegates to the default implementation in SupportsBulkOperations.
*
* @param file the InputFile to delete
*/
@Override
public void deleteFile(InputFile file) {
SupportsBulkOperations.super.deleteFile(file);
}
/**
* Deletes a file represented by an OutputFile.
* Delegates to the default implementation in SupportsBulkOperations.
*
* @param file the OutputFile to delete
*/
@Override
public void deleteFile(OutputFile file) {
SupportsBulkOperations.super.deleteFile(file);
}
/**
* Deletes multiple files in batches.
* Throws BulkDeletionFailureException if any batch fails.
*
* @param pathsToDelete iterable of file paths to delete
* @throws BulkDeletionFailureException if deletion fails for any batch
*/
@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
Iterable<List<String>> partitions = Iterables.partition(pathsToDelete, DELETE_BATCH_SIZE);
partitions.forEach(this::deleteBatch);
}
/**
* Helper method to delete a batch of files.
* Throws UncheckedIOException if deletion fails.
*
* @param filesToDelete list of file paths to delete
*/
private void deleteBatch(List<String> filesToDelete) {
Status status = fileSystem.deleteAll(filesToDelete);
if (!status.ok()) {
throw new UncheckedIOException(new IOException("Failed to delete some or all files: " + status.toString()));
}
}
// ===================== Manifest/Data/Delete File Methods =====================
/**
* Creates a new InputFile from a ManifestFile.
* Delegates to the default implementation in SupportsBulkOperations.
*
* @param manifest the ManifestFile
* @return an InputFile instance
*/
@Override
public InputFile newInputFile(ManifestFile manifest) {
return SupportsBulkOperations.super.newInputFile(manifest);
}
/**
* Creates a new InputFile from a DataFile.
* Delegates to the default implementation in SupportsBulkOperations.
*
* @param file the DataFile
* @return an InputFile instance
*/
@Override
public InputFile newInputFile(DataFile file) {
return SupportsBulkOperations.super.newInputFile(file);
}
/**
* Creates a new InputFile from a DeleteFile.
* Delegates to the default implementation in SupportsBulkOperations.
*
* @param file the DeleteFile
* @return an InputFile instance
*/
@Override
public InputFile newInputFile(DeleteFile file) {
return SupportsBulkOperations.super.newInputFile(file);
}
// ===================== Properties and Initialization =====================
/**
* Returns the properties used to initialize the file system.
*
* @return the properties map
*/
@Override
public Map<String, String> properties() {
return properties;
}
/**
* Initializes the file system with the given properties.
*
* @param properties the properties map
*/
@Override
public void initialize(Map<String, String> properties) {
StorageProperties storageProperties = StorageProperties.createPrimary(properties);
this.fileSystem = FileSystemFactory.get(storageProperties);
this.properties = properties;
}
/**
* Closes the file IO and releases any resources if necessary.
* No-op in this implementation.
*/
@Override
public void close() {
}
}