IcebergExpireSnapshotsAction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.expressions.Expression;

import com.google.common.collect.Lists;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Iceberg expire snapshots action implementation.
 * This action removes old snapshots from Iceberg tables to free up storage
 * space
 * and improve metadata performance.
 */
public class IcebergExpireSnapshotsAction extends BaseIcebergAction {
    private static final Logger LOG = LogManager.getLogger(IcebergExpireSnapshotsAction.class);
    public static final String OLDER_THAN = "older_than";
    public static final String RETAIN_LAST = "retain_last";
    public static final String MAX_CONCURRENT_DELETES = "max_concurrent_deletes";
    public static final String SNAPSHOT_IDS = "snapshot_ids";
    public static final String CLEAN_EXPIRED_METADATA = "clean_expired_metadata";

    public IcebergExpireSnapshotsAction(Map<String, String> properties,
            Optional<PartitionNamesInfo> partitionNamesInfo,
            Optional<Expression> whereCondition) {
        super("expire_snapshots", properties, partitionNamesInfo, whereCondition);
    }

    @Override
    protected void registerIcebergArguments() {
        // Register optional arguments for expire_snapshots
        namedArguments.registerOptionalArgument(OLDER_THAN,
                "Timestamp before which snapshots will be removed",
                null, ArgumentParsers.nonEmptyString(OLDER_THAN));
        namedArguments.registerOptionalArgument(RETAIN_LAST,
                "Number of ancestor snapshots to preserve regardless of older_than",
                null, ArgumentParsers.positiveInt(RETAIN_LAST));
        namedArguments.registerOptionalArgument(MAX_CONCURRENT_DELETES,
                "Size of the thread pool used for delete file actions (0 disables, "
                        + "ignored for FileIOs that support bulk deletes)",
                0, ArgumentParsers.intRange(MAX_CONCURRENT_DELETES, 0, Integer.MAX_VALUE));
        namedArguments.registerOptionalArgument(SNAPSHOT_IDS,
                "Array of snapshot IDs to expire",
                null, ArgumentParsers.nonEmptyString(SNAPSHOT_IDS));
        namedArguments.registerOptionalArgument(CLEAN_EXPIRED_METADATA,
                "When true, cleans up metadata such as partition specs and schemas",
                null, ArgumentParsers.booleanValue(CLEAN_EXPIRED_METADATA));
    }

    @Override
    protected void validateIcebergAction() throws UserException {
        // Validate older_than parameter (timestamp)
        String olderThan = namedArguments.getString(OLDER_THAN);
        if (olderThan != null) {
            try {
                // Try to parse as ISO datetime format
                LocalDateTime.parse(olderThan, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
            } catch (DateTimeParseException e) {
                try {
                    // Try to parse as timestamp (milliseconds since epoch)
                    long timestamp = Long.parseLong(olderThan);
                    if (timestamp < 0) {
                        throw new AnalysisException("older_than timestamp must be non-negative");
                    }
                } catch (NumberFormatException nfe) {
                    throw new AnalysisException("Invalid older_than format. Expected ISO datetime "
                            + "(yyyy-MM-ddTHH:mm:ss) or timestamp in milliseconds: " + olderThan);
                }
            }
        }

        // Validate retain_last parameter
        Integer retainLast = namedArguments.getInt(RETAIN_LAST);
        if (retainLast != null && retainLast < 1) {
            throw new AnalysisException("retain_last must be at least 1");
        }

        // Get snapshot_ids for validation
        String snapshotIds = namedArguments.getString(SNAPSHOT_IDS);

        // Validate snapshot_ids format if provided
        if (snapshotIds != null) {
            for (String idStr : snapshotIds.split(",")) {
                try {
                    Long.parseLong(idStr.trim());
                } catch (NumberFormatException e) {
                    throw new AnalysisException("Invalid snapshot_id format: " + idStr.trim());
                }
            }
        }

        // At least one of older_than, retain_last, or snapshot_ids must be specified
        if (olderThan == null && retainLast == null && snapshotIds == null) {
            throw new AnalysisException("At least one of 'older_than', 'retain_last', or "
                    + "'snapshot_ids' must be specified");
        }

        // Iceberg procedures don't support partitions or where conditions
        validateNoPartitions();
        validateNoWhereCondition();
    }

    @Override
    protected List<String> executeAction(TableIf table) throws UserException {
        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();

        // Parse parameters
        String olderThan = namedArguments.getString(OLDER_THAN);
        Integer retainLast = namedArguments.getInt(RETAIN_LAST);
        String snapshotIdsStr = namedArguments.getString(SNAPSHOT_IDS);
        Boolean cleanExpiredMetadata = namedArguments.getBoolean(CLEAN_EXPIRED_METADATA);
        Integer maxConcurrentDeletes = namedArguments.getInt(MAX_CONCURRENT_DELETES);

        // Track deleted file counts using callbacks (matching Spark's 6-column schema)
        AtomicLong deletedDataFilesCount = new AtomicLong(0);
        AtomicLong deletedPositionDeleteFilesCount = new AtomicLong(0);
        AtomicLong deletedEqualityDeleteFilesCount = new AtomicLong(0);
        AtomicLong deletedManifestFilesCount = new AtomicLong(0);
        AtomicLong deletedManifestListsCount = new AtomicLong(0);
        AtomicLong deletedStatisticsFilesCount = new AtomicLong(0);

        ExecutorService deleteExecutor = null;
        try {
            Map<String, FileContent> deleteFileContentByPath =
                    buildDeleteFileContentMap(icebergTable);
            ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();

            // Configure older_than timestamp
            // If retain_last is specified without older_than, use current time as the cutoff
            // This is because Iceberg's retainLast only works in conjunction with expireOlderThan
            if (olderThan != null) {
                long timestampMillis = parseTimestamp(olderThan);
                expireSnapshots.expireOlderThan(timestampMillis);
            } else if (retainLast != null && snapshotIdsStr == null) {
                // When only retain_last is specified, expire all snapshots older than now
                // but keep at least retain_last snapshots
                expireSnapshots.expireOlderThan(System.currentTimeMillis());
            }

            // Configure retain_last
            if (retainLast != null) {
                expireSnapshots.retainLast(retainLast);
            }

            // Configure specific snapshot IDs to expire
            if (snapshotIdsStr != null) {
                for (String idStr : snapshotIdsStr.split(",")) {
                    expireSnapshots.expireSnapshotId(Long.parseLong(idStr.trim()));
                }
            }

            // Configure clean expired metadata
            if (cleanExpiredMetadata != null) {
                expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
            }

            // Set up ExecutorService for concurrent deletes if specified
            if (maxConcurrentDeletes > 0) {
                if (icebergTable.io() instanceof SupportsBulkOperations) {
                    LOG.warn("max_concurrent_deletes only works with FileIOs that do not support "
                            + "bulk deletes. This table is currently using {} which supports bulk deletes "
                            + "so the parameter will be ignored.",
                            icebergTable.io().getClass().getName());
                } else {
                    deleteExecutor = Executors.newFixedThreadPool(maxConcurrentDeletes);
                    expireSnapshots.executeDeleteWith(deleteExecutor);
                }
            }

            // Set up delete callback to count files by type
            expireSnapshots.deleteWith(path -> {
                FileContent deleteContent = deleteFileContentByPath.get(path);
                if (deleteContent == FileContent.POSITION_DELETES) {
                    deletedPositionDeleteFilesCount.incrementAndGet();
                } else if (deleteContent == FileContent.EQUALITY_DELETES) {
                    deletedEqualityDeleteFilesCount.incrementAndGet();
                } else if (path.contains("-m-") && path.endsWith(".avro")) {
                    deletedManifestFilesCount.incrementAndGet();
                } else if (path.contains("snap-") && path.endsWith(".avro")) {
                    deletedManifestListsCount.incrementAndGet();
                } else if (path.endsWith(".stats") || path.contains("statistics")) {
                    deletedStatisticsFilesCount.incrementAndGet();
                } else {
                    deletedDataFilesCount.incrementAndGet();
                }
                icebergTable.io().deleteFile(path);
            });

            // Execute and commit
            expireSnapshots.commit();

            // Invalidate cache
            Env.getCurrentEnv().getExtMetaCacheMgr()
                .invalidateTableCache((ExternalTable) table);

            return Lists.newArrayList(
                String.valueOf(deletedDataFilesCount.get()),
                String.valueOf(deletedPositionDeleteFilesCount.get()),
                String.valueOf(deletedEqualityDeleteFilesCount.get()),
                String.valueOf(deletedManifestFilesCount.get()),
                String.valueOf(deletedManifestListsCount.get()),
                String.valueOf(deletedStatisticsFilesCount.get())
            );
        } catch (Exception e) {
            throw new UserException("Failed to expire snapshots: " + e.getMessage(), e);
        } finally {
            // Shutdown executor if created
            if (deleteExecutor != null) {
                deleteExecutor.shutdown();
            }
        }
    }

    /**
     * Parse timestamp string to milliseconds since epoch.
     * Supports ISO datetime format (yyyy-MM-ddTHH:mm:ss) or milliseconds.
     */
    private long parseTimestamp(String timestamp) {
        try {
            // Try ISO datetime format
            LocalDateTime dateTime = LocalDateTime.parse(timestamp,
                    DateTimeFormatter.ISO_LOCAL_DATE_TIME);
            return dateTime.atZone(ZoneId.systemDefault())
                .toInstant().toEpochMilli();
        } catch (DateTimeParseException e) {
            // Try as milliseconds
            return Long.parseLong(timestamp);
        }
    }

    private Map<String, FileContent> buildDeleteFileContentMap(Table icebergTable) throws UserException {
        Map<String, FileContent> deleteFileContentByPath = new HashMap<>();
        try {
            for (org.apache.iceberg.Snapshot snapshot : icebergTable.snapshots()) {
                List<ManifestFile> deleteManifests = snapshot.deleteManifests(icebergTable.io());
                if (deleteManifests == null || deleteManifests.isEmpty()) {
                    continue;
                }
                for (ManifestFile manifest : deleteManifests) {
                    try (CloseableIterable<DeleteFile> deleteFiles = ManifestFiles.readDeleteManifest(
                            manifest, icebergTable.io(), icebergTable.specs())) {
                        for (DeleteFile deleteFile : deleteFiles) {
                            deleteFileContentByPath.putIfAbsent(
                                    deleteFile.location(), deleteFile.content());
                        }
                    }
                }
            }
        } catch (Exception e) {
            throw new UserException("Failed to build delete file content map: " + e.getMessage(), e);
        }
        return deleteFileContentByPath;
    }

    @Override
    protected List<Column> getResultSchema() {
        return Lists.newArrayList(
            new Column("deleted_data_files_count", Type.BIGINT, false,
                "Number of data files deleted"),
            new Column("deleted_position_delete_files_count", Type.BIGINT, false,
                "Number of position delete files deleted"),
            new Column("deleted_equality_delete_files_count", Type.BIGINT, false,
                "Number of equality delete files deleted"),
            new Column("deleted_manifest_files_count", Type.BIGINT, false,
                "Number of manifest files deleted"),
            new Column("deleted_manifest_lists_count", Type.BIGINT, false,
                "Number of manifest list files deleted"),
            new Column("deleted_statistics_files_count", Type.BIGINT, false,
                "Number of statistics files deleted")
        );
    }

    @Override
    public String getDescription() {
        return "Expire old Iceberg snapshots to free up storage space and improve metadata performance";
    }
}