DiskMap.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hudi.common.util.collection;

import org.apache.hudi.common.util.FileIOUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;

/* Copied From
 * https://github.com/apache/hudi/blob/release-0.15.0/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskMap.java
 * Doris Modification.
 * Use Static cleaner class to avoid circular references in shutdown hooks
 */

/**
 * This interface provides the map interface for storing records in disk after
 * they
 * spill over from memory. Used by {@link ExternalSpillableMap}.
 *
 * @param <T> The generic type of the keys
 * @param <R> The generic type of the values
 */
public abstract class DiskMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {

  private static final Logger LOG = LoggerFactory.getLogger(DiskMap.class);
  private static final String SUBFOLDER_PREFIX = "hudi";
  private final File diskMapPathFile;
  private transient Thread shutdownThread = null;

  // Base path for the write file
  protected final String diskMapPath;

  public DiskMap(String basePath, String prefix) throws IOException {
    this.diskMapPath = String.format("%s/%s-%s-%s", basePath, SUBFOLDER_PREFIX, prefix, UUID.randomUUID().toString());
    diskMapPathFile = new File(diskMapPath);
    FileIOUtils.deleteDirectory(diskMapPathFile);
    FileIOUtils.mkdir(diskMapPathFile);
    // Make sure the folder is deleted when JVM exits
    diskMapPathFile.deleteOnExit();
    addShutDownHook();
  }

  /**
   * Register shutdown hook to force flush contents of the data written to
   * FileOutputStream from OS page cache
   * (typically 4 KB) to disk.
   */
  private void addShutDownHook() {
    // Register this disk map path with the static cleaner instead of using an
    // instance-specific hook
    DiskMapCleaner.registerForCleanup(diskMapPath);
  }

  /**
   * @returns a stream of the values stored in the disk.
   */
  abstract Stream<R> valueStream();

  /**
   * Number of bytes spilled to disk.
   */
  abstract long sizeOfFileOnDiskInBytes();

  /**
   * Close and cleanup the Map.
   */
  public void close() {
    cleanup(false);
  }

  /**
   * Cleanup all resources, files and folders
   * triggered by shutdownhook.
   */
  private void cleanup() {
    cleanup(true);
  }

  /**
   * Cleanup all resources, files and folders.
   */
  private void cleanup(boolean isTriggeredFromShutdownHook) {
    // Reuse the static cleaner method to clean the directory
    DiskMapCleaner.cleanupDirectory(diskMapPath);

    // Deregister from the static cleaner
    if (!isTriggeredFromShutdownHook) {
      DiskMapCleaner.deregisterFromCleanup(diskMapPath);
    }
  }

  /**
   * Static cleaner class to avoid circular references in shutdown hooks
   */
  private static class DiskMapCleaner {
    private static final Logger CLEANER_LOG = LoggerFactory.getLogger(DiskMapCleaner.class);
    private static final Set<String> PATHS_TO_CLEAN = Collections.synchronizedSet(new HashSet<>());
    private static final Thread SHUTDOWN_HOOK;

    static {
      // Register a single JVM-wide shutdown hook that handles all paths
      SHUTDOWN_HOOK = new Thread(() -> {
        synchronized (PATHS_TO_CLEAN) {
          PATHS_TO_CLEAN.forEach(DiskMapCleaner::cleanupDirectory);
          PATHS_TO_CLEAN.clear();
        }
      });
      Runtime.getRuntime().addShutdownHook(SHUTDOWN_HOOK);
    }

    /**
     * Register a path to be cleaned up when JVM exits
     * 
     * @param directoryPath Path to register for cleanup
     */
    public static void registerForCleanup(String directoryPath) {
      PATHS_TO_CLEAN.add(directoryPath);
    }

    /**
     * Deregister a path from cleanup when it's manually cleaned
     * 
     * @param directoryPath Path to deregister from cleanup
     */
    public static void deregisterFromCleanup(String directoryPath) {
      PATHS_TO_CLEAN.remove(directoryPath);
    }

    /**
     * Static cleanup method that doesn't hold references to DiskMap instances
     * 
     * @param directoryPath Path to the directory that needs to be cleaned up
     */
    public static void cleanupDirectory(String directoryPath) {
      try {
        FileIOUtils.deleteDirectory(new File(directoryPath));
      } catch (IOException exception) {
        CLEANER_LOG.warn("Error while deleting the disk map directory=" + directoryPath, exception);
      }
    }
  }
}