ManifestRewriteExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.rewrite;

import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.ExternalTable;

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RewriteManifests;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

/**
 * Executor for manifest rewrite operations
 */
public class ManifestRewriteExecutor {
    private static final Logger LOG = LogManager.getLogger(ManifestRewriteExecutor.class);

    public static class Result {
        private final int rewrittenCount;
        private final int totalCount;

        public Result(int rewrittenCount, int totalCount) {
            this.rewrittenCount = rewrittenCount;
            this.totalCount = totalCount;
        }

        public java.util.List<String> toStringList() {
            return java.util.Arrays.asList(String.valueOf(rewrittenCount),
                    String.valueOf(totalCount));
        }
    }

    /**
     * Execute manifest rewrite using Iceberg RewriteManifests API
     */
    public Result execute(Table table, ExternalTable extTable,
                          boolean clusterByPartition, int scanThreads,
                          Predicate<ManifestFile> predicate) {
        ExecutorService executor = null;
        try {
            Snapshot currentSnapshot = table.currentSnapshot();
            if (currentSnapshot == null) {
                return new Result(0, 0);
            }

            // Get manifest statistics before rewrite
            List<ManifestFile> dataManifests = currentSnapshot.dataManifests(table.io());
            int totalManifests = dataManifests.size();
            int selectedManifests = (int) dataManifests.stream()
                    .filter(predicate)
                    .count();

            // Execute rewrite operation
            RewriteManifests rm = table.rewriteManifests();

            // Optional: cluster by partition
            if (clusterByPartition) {
                rm.clusterBy(ContentFile::partition);
            }

            // Optional: use parallel scanning
            if (scanThreads > 0) {
                executor = Executors.newFixedThreadPool(scanThreads);
                rm.scanManifestsWith(executor);
            }

            // Execute rewrite based on predicate
            rm.rewriteIf(predicate).commit();

            // Invalidate cache
            Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
            return new Result(selectedManifests, totalManifests);
        } finally {
            if (executor != null) {
                shutdownExecutor(executor);
            }
        }
    }

    private void shutdownExecutor(ExecutorService executor) {
        // Disable new tasks from being submitted
        executor.shutdown();
        try {
            // Wait a while for existing tasks to terminate
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                    // Log warning if executor doesn't terminate
                    LOG.warn("ExecutorService did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            executor.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}