RewriteManifestExecutor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.iceberg.rewrite;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RewriteManifests;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* Executor for manifest rewrite operations
*/
public class RewriteManifestExecutor {
private static final Logger LOG = LogManager.getLogger(RewriteManifestExecutor.class);
public static class Result {
private final int rewrittenCount;
private final int addedCount;
public Result(int rewrittenCount, int addedCount) {
this.rewrittenCount = rewrittenCount;
this.addedCount = addedCount;
}
public java.util.List<String> toStringList() {
return java.util.Arrays.asList(String.valueOf(rewrittenCount),
String.valueOf(addedCount));
}
}
/**
* Execute manifest rewrite using Iceberg RewriteManifests API
*/
public Result execute(Table table, ExternalTable extTable, Integer specId) throws UserException {
try {
// Get current snapshot and return early if table is empty
Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
return new Result(0, 0);
}
// Collect manifests before rewrite and filter by specId if provided
List<ManifestFile> manifestsBefore = currentSnapshot.dataManifests(table.io());
List<ManifestFile> manifestsBeforeTargeted = filterBySpecId(manifestsBefore, specId);
int rewrittenCount = manifestsBeforeTargeted.size();
if (rewrittenCount == 0) {
return new Result(0, 0);
}
// Configure rewrite operation, optionally restricting manifests by specId
RewriteManifests rm = table.rewriteManifests();
if (specId != null) {
final int targetSpecId = specId;
rm.rewriteIf(manifest -> manifest.partitionSpecId() == targetSpecId);
}
// Commit manifest rewrite
rm.commit();
// Refresh snapshot after rewrite
Snapshot snapshotAfter = table.currentSnapshot();
if (snapshotAfter == null) {
return new Result(rewrittenCount, 0);
}
// Collect manifests after rewrite and filter by specId
List<ManifestFile> manifestsAfter = snapshotAfter.dataManifests(table.io());
List<ManifestFile> manifestsAfterTargeted = filterBySpecId(manifestsAfter, specId);
// Compute addedCount as newly produced manifests (path not in before set)
java.util.Set<String> beforePaths = manifestsBeforeTargeted.stream()
.map(ManifestFile::path)
.collect(java.util.stream.Collectors.toSet());
int addedCount = (int) manifestsAfterTargeted.stream()
.map(ManifestFile::path)
.filter(path -> !beforePaths.contains(path))
.count();
// Invalidate table cache to ensure metadata is refreshed
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
return new Result(rewrittenCount, addedCount);
} catch (Exception e) {
LOG.warn("Failed to execute manifest rewrite for table: {}", extTable.getName(), e);
throw new UserException("Failed to rewrite manifests: " + e.getMessage(), e);
}
}
private List<ManifestFile> filterBySpecId(List<ManifestFile> manifests, Integer specId) {
if (specId == null) {
return manifests;
}
final int targetSpecId = specId;
return manifests.stream()
.filter(manifest -> manifest.partitionSpecId() == targetSpecId)
.collect(java.util.stream.Collectors.toList());
}
}