IcebergExpireSnapshotsAction.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg.action;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Iceberg expire snapshots action implementation.
* This action removes old snapshots from Iceberg tables to free up storage
* space
* and improve metadata performance.
*/
public class IcebergExpireSnapshotsAction extends BaseIcebergAction {
public static final String OLDER_THAN = "older_than";
public static final String RETAIN_LAST = "retain_last";
public static final String MAX_CONCURRENT_DELETES = "max_concurrent_deletes";
public static final String STREAM_RESULTS = "stream_results";
public static final String SNAPSHOT_IDS = "snapshot_ids";
public static final String CLEAN_EXPIRED_METADATA = "clean_expired_metadata";
public IcebergExpireSnapshotsAction(Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo,
Optional<Expression> whereCondition,
IcebergExternalTable icebergTable) {
super("expire_snapshots", properties, partitionNamesInfo, whereCondition, icebergTable);
}
@Override
protected void registerIcebergArguments() {
// Register optional arguments for expire_snapshots
namedArguments.registerOptionalArgument(OLDER_THAN,
"Timestamp before which snapshots will be removed",
null, ArgumentParsers.nonEmptyString(OLDER_THAN));
namedArguments.registerOptionalArgument(RETAIN_LAST,
"Number of ancestor snapshots to preserve regardless of older_than",
null, ArgumentParsers.positiveInt(RETAIN_LAST));
namedArguments.registerOptionalArgument(MAX_CONCURRENT_DELETES,
"Size of the thread pool used for delete file actions",
null, ArgumentParsers.positiveInt(MAX_CONCURRENT_DELETES));
namedArguments.registerOptionalArgument(STREAM_RESULTS,
"When true, deletion files will be sent to Spark driver by RDD partition",
null, ArgumentParsers.booleanValue(STREAM_RESULTS));
namedArguments.registerOptionalArgument(SNAPSHOT_IDS,
"Array of snapshot IDs to expire",
null, ArgumentParsers.nonEmptyString(SNAPSHOT_IDS));
namedArguments.registerOptionalArgument(CLEAN_EXPIRED_METADATA,
"When true, cleans up metadata such as partition specs and schemas",
null, ArgumentParsers.booleanValue(CLEAN_EXPIRED_METADATA));
}
@Override
protected void validateIcebergAction() throws UserException {
// Validate older_than parameter (timestamp)
String olderThan = namedArguments.getString(OLDER_THAN);
if (olderThan != null) {
try {
// Try to parse as ISO datetime format
LocalDateTime.parse(olderThan, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
} catch (DateTimeParseException e) {
try {
// Try to parse as timestamp (milliseconds since epoch)
long timestamp = Long.parseLong(olderThan);
if (timestamp < 0) {
throw new AnalysisException("older_than timestamp must be non-negative");
}
} catch (NumberFormatException nfe) {
throw new AnalysisException("Invalid older_than format. Expected ISO datetime "
+ "(yyyy-MM-ddTHH:mm:ss) or timestamp in milliseconds: " + olderThan);
}
}
}
// Validate retain_last parameter
Integer retainLast = namedArguments.getInt(RETAIN_LAST);
if (retainLast != null && retainLast < 1) {
throw new AnalysisException("retain_last must be at least 1");
}
// At least one of older_than or retain_last must be specified for validation
if (olderThan == null && retainLast == null) {
throw new AnalysisException("At least one of 'older_than' or 'retain_last' must be specified");
}
// Iceberg procedures don't support partitions or where conditions
validateNoPartitions();
validateNoWhereCondition();
}
@Override
protected List<String> executeAction(TableIf table) throws UserException {
throw new DdlException("Iceberg expire_snapshots procedure is not implemented yet");
}
@Override
public String getDescription() {
return "Expire old Iceberg snapshots to free up storage space and improve metadata performance";
}
}