ProfileArchiveManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.profile;
import org.apache.doris.common.Config;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
* ProfileArchiveManager handles archiving of profile files from the spilled profile storage
* to compressed ZIP archives. This is triggered when the number of profiles or total storage
* size exceeds configured limits.
*
* <p>Archive Strategy:
* <ul>
* <li>Profiles are moved to pending directory first</li>
* <li>When pending has >= batch_size files OR oldest file > timeout hours: archive is triggered</li>
* <li>Batch size: configurable via Config.profile_archive_batch_size (default 1000)</li>
* <li>Naming pattern: profiles_YYYYMMDD_HHMMSS_YYYYMMDD_HHMMSS.zip</li>
* <li>Archive location: ${LOG_DIR}/profile/archive (or Config.profile_archive_path)</li>
* <li>Archives oldest profiles first (by finish timestamp)</li>
* </ul>
*
* <p>Directory Structure:
* <pre>
* ${spilled_profile_storage_path}/
* ��������� {timestamp}_{queryid}.zip (active spilled profiles)
* ��������� archive/
* ��������� pending/ (pending buffer for archiving)
* ��� ��������� {timestamp}_{queryid}.zip
* ��������� profiles_20240101_000000_20240101_235959.zip (archived ZIPs)
* </pre>
*/
public class ProfileArchiveManager {
private static final Logger LOG = LogManager.getLogger(ProfileArchiveManager.class);
// Default batch size for archiving profiles
private static final int DEFAULT_ARCHIVE_BATCH_SIZE = 100;
// Date format for archive file names (thread-safe)
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss").withZone(ZoneId.systemDefault());
// Archive directory name
private static final String ARCHIVE_DIR_NAME = "archive";
// Pending directory name (buffer for files waiting to be archived)
private static final String PENDING_DIR_NAME = "pending";
private final String spilledProfileStoragePath;
private final String archivePath;
private final String pendingPath;
private final int archiveBatchSize;
/**
* Constructs a ProfileArchiveManager with default batch size.
*
* @param spilledProfileStoragePath the path where spilled profiles are stored
*/
public ProfileArchiveManager(String spilledProfileStoragePath) {
this(spilledProfileStoragePath, DEFAULT_ARCHIVE_BATCH_SIZE);
}
/**
* Constructs a ProfileArchiveManager with custom batch size.
*
* @param spilledProfileStoragePath the path where spilled profiles are stored
* @param archiveBatchSize number of profiles to include in each archive
*/
public ProfileArchiveManager(String spilledProfileStoragePath, int archiveBatchSize) {
this.spilledProfileStoragePath = spilledProfileStoragePath;
// Use custom archive path from config if specified, otherwise use default
this.archivePath = Config.profile_archive_path.isEmpty()
? spilledProfileStoragePath + File.separator + ARCHIVE_DIR_NAME
: Config.profile_archive_path;
this.pendingPath = archivePath + File.separator + PENDING_DIR_NAME;
this.archiveBatchSize = archiveBatchSize;
}
/**
* Creates the archive directory if it doesn't exist.
*
* @return true if directory exists or was created successfully
*/
public boolean createArchiveDirectoryIfNecessary() {
File archiveDir = new File(archivePath);
if (archiveDir.exists()) {
return true;
}
if (archiveDir.mkdirs()) {
LOG.info("Created profile archive directory: {}", archivePath);
return true;
} else {
LOG.error("Failed to create profile archive directory: {}", archivePath);
return false;
}
}
/**
* Creates the pending directory if it doesn't exist.
*
* @return true if directory exists or was created successfully
*/
public boolean createPendingDirectoryIfNecessary() {
File pendingDir = new File(pendingPath);
if (pendingDir.exists()) {
return true;
}
if (pendingDir.mkdirs()) {
LOG.info("Created profile pending directory: {}", pendingPath);
return true;
} else {
LOG.error("Failed to create profile pending directory: {}", pendingPath);
return false;
}
}
/**
* Gets the list of profile files from the spilled storage directory,
* excluding the archive subdirectory.
*
* @return list of profile files sorted by filename timestamp (oldest first)
*/
public List<File> getProfileFilesFromStorage() {
File storageDir = new File(spilledProfileStoragePath);
if (!storageDir.exists() || !storageDir.isDirectory()) {
LOG.warn("Profile storage directory does not exist: {}", spilledProfileStoragePath);
return new ArrayList<>();
}
File[] files = storageDir.listFiles(file -> {
// Only include files (not directories), exclude hidden files and archive directory
return file.isFile() && !file.getName().startsWith(".");
});
if (files == null || files.length == 0) {
return new ArrayList<>();
}
// Sort by timestamp extracted from filename (oldest first)
List<File> profileFiles = new ArrayList<>(Arrays.asList(files));
profileFiles.sort(Comparator.comparingLong(file -> {
long timestamp = extractTimestampFromProfileFile(file);
// Files with invalid timestamps go to the end
return timestamp == -1 ? Long.MAX_VALUE : timestamp;
}));
return profileFiles;
}
/**
* Gets the list of profile files from the pending directory.
*
* @return list of profile files sorted by filename timestamp (oldest first)
*/
private List<File> getPendingProfileFiles() {
File pendingDir = new File(pendingPath);
if (!pendingDir.exists() || !pendingDir.isDirectory()) {
return new ArrayList<>();
}
File[] files = pendingDir.listFiles(File::isFile);
if (files == null || files.length == 0) {
return new ArrayList<>();
}
List<File> profileFiles = new ArrayList<>(Arrays.asList(files));
// Sort by timestamp extracted from filename (oldest first)
profileFiles.sort(Comparator.comparingLong(file -> {
long timestamp = extractTimestampFromProfileFile(file);
// Files with invalid timestamps go to the end
return timestamp == -1 ? Long.MAX_VALUE : timestamp;
}));
return profileFiles;
}
/**
* Extracts timestamp from profile filename.
* Profile filename format: timestamp_queryid.zip
*
* @param profileFile the profile file
* @return timestamp in milliseconds, or -1 if parsing fails
*/
private long extractTimestampFromProfileFile(File profileFile) {
String filename = profileFile.getName();
if (!filename.endsWith(".zip")) {
return -1;
}
// Remove .zip extension
filename = filename.substring(0, filename.length() - 4);
// Extract timestamp (format: timestamp_queryid)
String[] parts = filename.split("_", 2);
if (parts.length < 1) {
return -1;
}
try {
return Long.parseLong(parts[0]);
} catch (NumberFormatException e) {
LOG.warn("Failed to parse timestamp from profile filename: {}", profileFile.getName());
return -1;
}
}
/**
* Extracts timestamp from archive filename.
* Archive filename format: profiles_YYYYMMDD_HHMMSS_YYYYMMDD_HHMMSS.zip
* or profiles_YYYYMMDD_HHMMSS_YYYYMMDD_HHMMSS_N.zip (with suffix)
*
* @param archiveFile the archive file
* @return timestamp in milliseconds (parsed from first date), or -1 if parsing fails
*/
private long extractTimestampFromArchiveFile(File archiveFile) {
String filename = archiveFile.getName();
if (!filename.startsWith("profiles_") || !filename.endsWith(".zip")) {
return -1;
}
// Remove "profiles_" prefix and ".zip" extension
// Format: YYYYMMDD_HHMMSS_YYYYMMDD_HHMMSS[_N]
String dateStr = filename.substring(9, filename.length() - 4);
// Extract first timestamp: YYYYMMDD_HHMMSS
String[] parts = dateStr.split("_");
if (parts.length < 2) {
return -1;
}
try {
// Parse YYYYMMDD_HHMMSS format
String firstTimestamp = parts[0] + "_" + parts[1];
Instant instant = Instant.from(DATE_FORMAT.parse(firstTimestamp));
return instant.toEpochMilli();
} catch (Exception e) {
LOG.warn("Failed to parse timestamp from archive filename: {}", archiveFile.getName(), e);
return -1;
}
}
/**
* Archives a batch of profile files into a single ZIP archive.
*
* @param profilesToArchive list of profile files to archive
* @return the created archive file, or null if archiving failed
*/
public File archiveProfiles(List<File> profilesToArchive) {
if (profilesToArchive == null || profilesToArchive.isEmpty()) {
LOG.warn("No profiles to archive");
return null;
}
if (!createArchiveDirectoryIfNecessary()) {
LOG.error("Failed to create archive directory");
return null;
}
// Extract timestamps to determine archive filename
List<Long> timestamps = profilesToArchive.stream()
.map(this::extractTimestampFromProfileFile)
.filter(ts -> ts > 0)
.sorted()
.collect(Collectors.toList());
if (timestamps.isEmpty()) {
LOG.error("Failed to extract timestamps from profile files");
return null;
}
long minTimestamp = timestamps.get(0);
long maxTimestamp = timestamps.get(timestamps.size() - 1);
// Generate archive filename: profiles_YYYYMMDD_HHMMSS_YYYYMMDD_HHMMSS.zip
String minDateStr = DATE_FORMAT.format(Instant.ofEpochMilli(minTimestamp));
String maxDateStr = DATE_FORMAT.format(Instant.ofEpochMilli(maxTimestamp));
String archiveFilename = String.format("profiles_%s_%s.zip", minDateStr, maxDateStr);
File archiveFile = new File(archivePath, archiveFilename);
// If archive file already exists, append a suffix
int suffix = 1;
while (archiveFile.exists()) {
archiveFilename = String.format("profiles_%s_%s_%d.zip", minDateStr, maxDateStr, suffix);
archiveFile = new File(archivePath, archiveFilename);
suffix++;
}
// Create the archive
try (FileOutputStream fos = new FileOutputStream(archiveFile);
ZipOutputStream zos = new ZipOutputStream(fos)) {
for (File profileFile : profilesToArchive) {
try (FileInputStream fis = new FileInputStream(profileFile)) {
// Add profile file to archive with its original name
ZipEntry zipEntry = new ZipEntry(profileFile.getName());
zos.putNextEntry(zipEntry);
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
zos.write(buffer, 0, bytesRead);
}
zos.closeEntry();
} catch (IOException e) {
LOG.error("Failed to add profile {} to archive", profileFile.getName(), e);
// Continue with other files
}
}
LOG.info("Created archive {} with {} profiles", archiveFilename, profilesToArchive.size());
return archiveFile;
} catch (IOException e) {
LOG.error("Failed to create archive file: {}", archiveFilename, e);
// Clean up partial archive file
FileUtils.deleteQuietly(archiveFile);
return null;
}
}
/**
* Deletes profile files from the spilled storage after they have been archived.
*
* @param profileFiles list of profile files to delete
* @return number of files successfully deleted
*/
public int deleteArchivedProfiles(List<File> profileFiles) {
int deletedCount = 0;
for (File profileFile : profileFiles) {
if (FileUtils.deleteQuietly(profileFile)) {
deletedCount++;
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted archived profile: {}", profileFile.getName());
}
} else {
LOG.warn("Failed to delete archived profile: {}", profileFile.getName());
}
}
LOG.info("Deleted {} out of {} archived profiles", deletedCount, profileFiles.size());
return deletedCount;
}
/**
* Moves a profile file to the pending archive directory.
* The file will be archived later when batch size is reached or timeout occurs.
*
* @param profileFile the profile file to move
* @return true if the file was moved successfully, false otherwise
*/
public boolean moveToArchivePending(File profileFile) {
if (profileFile == null || !profileFile.exists()) {
LOG.warn("Profile file does not exist: {}", profileFile);
return false;
}
// Create pending directory if necessary
if (!createPendingDirectoryIfNecessary()) {
LOG.error("Failed to create pending directory");
return false;
}
// Move file to pending directory
File targetFile = new File(pendingPath, profileFile.getName());
try {
Files.move(profileFile.toPath(), targetFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
if (LOG.isDebugEnabled()) {
LOG.debug("Moved profile to pending: {}", profileFile.getName());
}
return true;
} catch (IOException e) {
LOG.error("Failed to move profile to pending: {}", profileFile.getName(), e);
return false;
}
}
/**
* Checks the pending directory and archives profiles if conditions are met.
* Archiving is triggered when:
* 1. Number of pending files >= batch size, OR
* 2. Oldest file in pending has exceeded timeout hours
*
* Archiving strategy:
* - If triggered by batch size: only archive complete batches, leave incomplete batch in pending
* - If triggered by timeout: archive all files including incomplete batch
*
* @return number of profiles successfully archived
*/
public int checkAndArchivePendingProfiles() {
File pendingDir = new File(pendingPath);
if (!pendingDir.exists()) {
return 0;
}
// Get all pending files
List<File> pendingFiles = getPendingProfileFiles();
if (pendingFiles.isEmpty()) {
return 0;
}
// Check if archiving should be triggered
boolean shouldArchive = false;
boolean isTimeoutTriggered = false;
String reason = "";
// Condition 1: Number of files >= batch size
if (pendingFiles.size() >= archiveBatchSize) {
shouldArchive = true;
isTimeoutTriggered = false;
reason = String.format("batch size reached (%d files >= %d)",
pendingFiles.size(), archiveBatchSize);
}
// Condition 2: Oldest file exceeds timeout
// Use filename timestamp (query finish time) instead of file modification time
if (!shouldArchive) {
long oldestFileTimestamp = extractTimestampFromProfileFile(pendingFiles.get(0));
if (oldestFileTimestamp > 0) {
long currentTime = System.currentTimeMillis();
long ageSeconds = (currentTime - oldestFileTimestamp) / 1000;
if (ageSeconds >= Config.profile_archive_pending_timeout_seconds) {
shouldArchive = true;
isTimeoutTriggered = true;
reason = String.format("oldest file age %d seconds exceeds timeout %d seconds",
ageSeconds, Config.profile_archive_pending_timeout_seconds);
}
}
}
if (!shouldArchive) {
if (LOG.isDebugEnabled()) {
LOG.debug("Pending profiles: {} files, not ready to archive", pendingFiles.size());
}
return 0;
}
// Determine how many files to archive based on trigger condition
int filesToArchiveCount;
if (isTimeoutTriggered) {
// Timeout triggered: archive all files including incomplete batch
filesToArchiveCount = pendingFiles.size();
LOG.info("Archiving all {} pending profiles due to timeout, reason: {}",
filesToArchiveCount, reason);
} else {
// Batch size triggered: only archive complete batches
filesToArchiveCount = (pendingFiles.size() / archiveBatchSize) * archiveBatchSize;
int remainingFiles = pendingFiles.size() - filesToArchiveCount;
LOG.info("Archiving {} pending profiles (complete batches only), {} files remain in pending, reason: {}",
filesToArchiveCount, remainingFiles, reason);
}
int totalArchived = 0;
// Archive in batches
List<File> filesToArchive = pendingFiles.subList(0, filesToArchiveCount);
for (int i = 0; i < filesToArchive.size(); i += archiveBatchSize) {
int endIdx = Math.min(i + archiveBatchSize, filesToArchive.size());
List<File> batch = filesToArchive.subList(i, endIdx);
File archiveFile = archiveProfiles(batch);
if (archiveFile != null) {
// Delete original files after successful archiving
int deleted = deleteArchivedProfiles(batch);
totalArchived += deleted;
} else {
LOG.error("Failed to create archive for batch starting at index {}", i);
// Continue with next batch
}
}
LOG.info("Successfully archived {} profiles from pending, {} files remain",
totalArchived, pendingFiles.size() - totalArchived);
return totalArchived;
}
/**
* Archives profiles in batches from the spilled storage.
* This should be called when storage limits are exceeded.
*
* @param numProfilesToArchive number of profiles to archive
* @return number of profiles successfully archived
*/
public int archiveOldestProfiles(int numProfilesToArchive) {
if (numProfilesToArchive <= 0) {
return 0;
}
List<File> allProfileFiles = getProfileFilesFromStorage();
if (allProfileFiles.isEmpty()) {
LOG.info("No profiles available to archive");
return 0;
}
// Limit to available profiles
int profilesToArchive = Math.min(numProfilesToArchive, allProfileFiles.size());
List<File> filesToArchive = allProfileFiles.subList(0, profilesToArchive);
int totalArchived = 0;
// Archive in batches
for (int i = 0; i < filesToArchive.size(); i += archiveBatchSize) {
int endIdx = Math.min(i + archiveBatchSize, filesToArchive.size());
List<File> batch = filesToArchive.subList(i, endIdx);
File archiveFile = archiveProfiles(batch);
if (archiveFile != null) {
// Delete original files after successful archiving
int deleted = deleteArchivedProfiles(batch);
totalArchived += deleted;
} else {
LOG.error("Failed to create archive for batch starting at index {}", i);
// Continue with next batch
}
}
LOG.info("Successfully archived {} profiles", totalArchived);
return totalArchived;
}
/**
* Gets the list of archive files sorted by archive timestamp (from filename).
*
* @return list of archive files sorted by archive timestamp (oldest first)
*/
public List<File> getArchiveFiles() {
File archiveDir = new File(archivePath);
if (!archiveDir.exists() || !archiveDir.isDirectory()) {
return new ArrayList<>();
}
File[] files = archiveDir.listFiles(file ->
file.isFile() && file.getName().startsWith("profiles_") && file.getName().endsWith(".zip")
);
if (files == null || files.length == 0) {
return new ArrayList<>();
}
List<File> archiveFiles = new ArrayList<>();
for (File file : files) {
archiveFiles.add(file);
}
// Sort by timestamp extracted from archive filename (oldest first)
archiveFiles.sort(Comparator.comparingLong(file -> {
long timestamp = extractTimestampFromArchiveFile(file);
// Files with invalid timestamps go to the end
return timestamp == -1 ? Long.MAX_VALUE : timestamp;
}));
return archiveFiles;
}
/**
* Gets the total size of all archive files in bytes.
*
* @return total archive size in bytes
*/
public long getTotalArchiveSize() {
List<File> archiveFiles = getArchiveFiles();
long totalSize = 0;
for (File file : archiveFiles) {
totalSize += file.length();
}
return totalSize;
}
/**
* Gets the archive directory path.
*
* @return archive directory path
*/
public String getArchivePath() {
return archivePath;
}
/**
* Cleans up old archive files that exceed the retention period.
* Retention period is configured via Config.profile_archive_retention_seconds.
* -1 means keep forever, 0 means don't retain (disable archiving), >0 means delete after N seconds.
*
* The age of an archive file is determined by the timestamp in its filename
* (profiles_YYYYMMDD_HHMMSS_...), not by the file system modification time.
*
* @return number of archive files deleted
*/
public int cleanOldArchives() {
if (Config.profile_archive_retention_seconds < 0) {
// -1 means keep forever
if (LOG.isDebugEnabled()) {
LOG.debug("Archive retention is set to unlimited, no cleanup needed");
}
return 0;
}
if (Config.profile_archive_retention_seconds == 0) {
// 0 means don't retain, but this should be handled by disabling archive feature
LOG.warn("profile_archive_retention_seconds is 0, consider disabling archive feature");
return 0;
}
List<File> archiveFiles = getArchiveFiles();
if (archiveFiles.isEmpty()) {
return 0;
}
long retentionMillis = Config.profile_archive_retention_seconds * 1000L;
long currentTime = System.currentTimeMillis();
int deletedCount = 0;
for (File archiveFile : archiveFiles) {
// Use timestamp from filename instead of file modification time
long archiveTimestamp = extractTimestampFromArchiveFile(archiveFile);
if (archiveTimestamp <= 0) {
// Skip files with invalid timestamps
LOG.warn("Skipping archive file with invalid timestamp: {}", archiveFile.getName());
continue;
}
long fileAge = currentTime - archiveTimestamp;
if (fileAge >= retentionMillis) {
if (FileUtils.deleteQuietly(archiveFile)) {
deletedCount++;
LOG.info("Deleted old archive file: {}, age: {} seconds",
archiveFile.getName(), fileAge / 1000);
} else {
LOG.warn("Failed to delete old archive file: {}", archiveFile.getName());
}
}
}
if (deletedCount > 0) {
LOG.info("Cleaned {} old archive files, retention: {} seconds",
deletedCount, Config.profile_archive_retention_seconds);
}
return deletedCount;
}
}