BackupHandler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.AbstractJob.JobType;
import org.apache.doris.backup.BackupJob.BackupJobState;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.info.PartitionNamesInfo;
import org.apache.doris.cloud.backup.CloudRestoreJob;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.info.TableRefInfo;
import org.apache.doris.nereids.trees.plans.commands.BackupCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelBackupCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRepositoryCommand;
import org.apache.doris.nereids.trees.plans.commands.RestoreCommand;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.task.UploadTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class BackupHandler extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(BackupHandler.class);
public static final int SIGNATURE_VERSION = 1;
public static final Path BACKUP_ROOT_DIR = Paths.get(Config.tmp_dir, "backup").normalize();
public static final Path RESTORE_ROOT_DIR = Paths.get(Config.tmp_dir, "restore").normalize();
private RepositoryMgr repoMgr = new RepositoryMgr();
// this lock is used for updating dbIdToBackupOrRestoreJobs
private final ReentrantLock jobLock = new ReentrantLock();
// === Dual Queue Architecture ===
// Running queue: stores all unfinished jobs (PENDING or executing), no size limit
// These jobs are actively being processed by the scheduler
private final Map<Long, Deque<AbstractJob>> dbIdToRunningJobs = new ConcurrentHashMap<>();
private final ReentrantLock runningJobLock = new ReentrantLock();
// History queue: stores finished jobs (FINISHED, CANCELLED), FIFO with size limit
// Used for SHOW BACKUP/RESTORE commands to display historical jobs
private final Map<Long, Deque<AbstractJob>> dbIdToHistoryJobs = new ConcurrentHashMap<>();
private final ReentrantLock historyJobLock = new ReentrantLock();
// Legacy single queue (kept for backward compatibility during migration)
// TODO: Remove this after confirming dual queue works correctly
private final Map<Long, Deque<AbstractJob>> dbIdToBackupOrRestoreJobs = new HashMap<>();
// this lock is used for handling one backup or restore request at a time.
private ReentrantLock seqlock = new ReentrantLock();
private boolean isInit = false;
private Env env;
// map to store backup info, key is label name, value is the BackupJob
// this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
private final Map<String, BackupJob> localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();
// === Concurrency control fields (only used when enable_table_level_backup_concurrency is true) ===
// Execution gate: only jobs in this set are allowed to execute
// Jobs in PENDING state but not in this set will skip execution until activated
private final Set<Long> allowedJobIds = Collections.synchronizedSet(new HashSet<>());
// Restore table conflict detection: tracks which tables are currently being restored
// Key: dbId, Value: set of table names being restored
// Used to detect conflicts between restore jobs operating on the same tables
private final Map<Long, Set<String>> restoringTables = new ConcurrentHashMap<>();
// Database job statistics cache for O(1) concurrency checking
// Key: dbId, Value: statistics for that database
private final Map<Long, DatabaseJobStats> dbJobStats = new ConcurrentHashMap<>();
private final AtomicInteger globalSnapshotTasks = new AtomicInteger(0);
// Label index for O(1) duplicate label detection
// Key: dbId, Value: (label -> jobId) mapping
private final Map<Long, Map<String, Long>> labelIndex = new ConcurrentHashMap<>();
public BackupHandler() {
// for persist
}
public BackupHandler(Env env) {
super("backupHandler", Config.backup_handler_update_interval_millis);
this.env = env;
}
public void setEnv(Env env) {
this.env = env;
}
@Override
public synchronized void start() {
Preconditions.checkNotNull(env);
super.start();
repoMgr.start();
}
public RepositoryMgr getRepoMgr() {
return repoMgr;
}
public int getGlobalSnapshotTasks() {
return globalSnapshotTasks.get();
}
public void addGlobalSnapshotTasks(int count) {
globalSnapshotTasks.addAndGet(count);
}
private boolean init() {
// Check and create backup dir if necessarily
File backupDir = new File(BACKUP_ROOT_DIR.toString());
if (!backupDir.exists()) {
if (!backupDir.mkdirs()) {
LOG.warn("failed to create backup dir: " + BACKUP_ROOT_DIR);
return false;
}
} else {
if (!backupDir.isDirectory()) {
LOG.warn("backup dir is not a directory: " + BACKUP_ROOT_DIR);
return false;
}
}
// Check and create restore dir if necessarily
File restoreDir = new File(RESTORE_ROOT_DIR.toString());
if (!restoreDir.exists()) {
if (!restoreDir.mkdirs()) {
LOG.warn("failed to create restore dir: " + RESTORE_ROOT_DIR);
return false;
}
} else {
if (!restoreDir.isDirectory()) {
LOG.warn("restore dir is not a directory: " + RESTORE_ROOT_DIR);
return false;
}
}
isInit = true;
// Rebuild concurrency control state after FE restart
if (Config.enable_table_level_backup_concurrency) {
rebuildConcurrencyStateAfterRestart();
}
return true;
}
/**
* Rebuild concurrency control state after FE restart.
* This method scans all jobs and rebuilds the internal data structures including dual queues.
*/
private void rebuildConcurrencyStateAfterRestart() {
LOG.info("Rebuilding concurrency control state and dual queues after FE restart...");
// Clear all caches and queues
allowedJobIds.clear();
dbJobStats.clear();
labelIndex.clear();
restoringTables.clear();
dbIdToRunningJobs.clear();
dbIdToHistoryJobs.clear();
int totalJobs = 0;
int runningJobs = 0;
int historyJobs = 0;
int activatedJobs = 0;
// Scan all databases and jobs to rebuild dual queues
for (Map.Entry<Long, Deque<AbstractJob>> entry : dbIdToBackupOrRestoreJobs.entrySet()) {
long dbId = entry.getKey();
for (AbstractJob job : entry.getValue()) {
totalJobs++;
boolean isBackup = job instanceof BackupJob;
boolean isFullDatabase = false;
// Determine if this is a full database operation
if (isBackup && job instanceof BackupJob) {
isFullDatabase = ((BackupJob) job).getTableRefs().isEmpty();
} else if (!isBackup && job instanceof RestoreJob) {
isFullDatabase = ((RestoreJob) job).getTableRefs().isEmpty();
}
// Step 1: Add to appropriate queue (running or history)
if (job.isDone()) {
// Completed job -> history queue
addToHistoryQueue(dbId, job);
historyJobs++;
} else {
// Active job (PENDING or running) -> running queue
try {
Deque<AbstractJob> runningQueue = dbIdToRunningJobs.computeIfAbsent(
dbId, k -> new LinkedList<>());
runningQueue.addLast(job);
runningJobs++;
} catch (Exception e) {
LOG.warn("Failed to add job {} to running queue during rebuild: {}",
job.getLabel(), e.getMessage());
}
// Step 2: Rebuild concurrency control statistics
onJobCreated(dbId, job, isBackup, isFullDatabase);
// Step 3: For jobs that were already running (not PENDING),
// re-add to allowedJobIds and rebuild restoringTables
boolean isPending = false;
if (isBackup && job instanceof BackupJob) {
isPending = ((BackupJob) job).getState() == BackupJob.BackupJobState.PENDING;
} else if (!isBackup && job instanceof RestoreJob) {
isPending = ((RestoreJob) job).getState() == RestoreJob.RestoreJobState.PENDING;
}
if (!isPending) {
// Job was running, add to allowedJobIds and update running counters
allowedJobIds.add(job.getJobId());
onJobActivated(dbId, job, isBackup, isFullDatabase);
activatedJobs++;
// For running restore jobs, rebuild restoringTables
if (job instanceof RestoreJob) {
addRestoringTables((RestoreJob) job);
}
}
}
}
}
// Try to activate PENDING jobs
for (Long dbId : dbIdToRunningJobs.keySet()) {
tryActivatePendingJobs(dbId);
}
int globalTotal = 0;
for (Deque<AbstractJob> queue : dbIdToRunningJobs.values()) {
for (AbstractJob j : queue) {
globalTotal += j.getSnapshotTaskCount();
}
}
globalSnapshotTasks.set(globalTotal);
LOG.info("Rebuilt dual queues after restart: {} total jobs ({} running, {} history), "
+ "{} activated, {} databases, globalSnapshotTasks={}",
totalJobs, runningJobs, historyJobs, activatedJobs,
Math.max(dbIdToRunningJobs.size(), dbIdToHistoryJobs.size()),
globalTotal);
}
public AbstractJob getJob(long dbId) {
return getCurrentJob(dbId);
}
public List<AbstractJob> getJobs(long dbId, Predicate<String> predicate) {
if (Config.enable_table_level_backup_concurrency) {
// In dual queue mode, merge both queues
return getAllJobs(dbId).stream()
.filter(e -> predicate.test(e.getLabel()))
.collect(Collectors.toList());
} else {
// Legacy mode: use old single queue
jobLock.lock();
try {
return dbIdToBackupOrRestoreJobs.getOrDefault(dbId, new LinkedList<>())
.stream()
.filter(e -> predicate.test(e.getLabel()))
.collect(Collectors.toList());
} finally {
jobLock.unlock();
}
}
}
@Override
protected void runAfterCatalogReady() {
if (!isInit) {
if (!init()) {
return;
}
}
// Get jobs to process based on concurrency mode
List<AbstractJob> jobsToProcess;
if (Config.enable_table_level_backup_concurrency) {
// In concurrency mode, only process jobs from running queue
jobsToProcess = getAllRunningJobs();
} else {
// In legacy mode, use the old logic
jobsToProcess = getAllCurrentJobs();
}
for (AbstractJob job : jobsToProcess) {
// Skip completed jobs (optimization: avoid unnecessary run() calls)
// Although job.run() will return immediately if done, this saves the call overhead
if (job.isDone()) {
continue;
}
job.setEnv(env);
job.run();
}
}
// handle create repository command
public void createRepository(CreateRepositoryCommand command) throws DdlException {
if (!env.getBrokerMgr().containsBroker(command.getBrokerName())
&& command.getStorageType() == StorageBackend.StorageType.BROKER) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"broker does not exist: " + command.getBrokerName());
}
RemoteFileSystem fileSystem;
fileSystem = FileSystemFactory.get(command.getStorageProperties());
long repoId = env.getNextId();
Repository repo = new Repository(repoId, command.getName(), command.isReadOnly(), command.getLocation(),
fileSystem);
Status st = repoMgr.addAndInitRepoIfNotExist(repo, false);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: " + st.getErrMsg());
}
//fixme why ping again? it has pinged in addAndInitRepoIfNotExist
if (!repo.ping()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: failed to connect to the repo");
}
}
/**
* Alters an existing repository by applying the given new properties.
*
* @param repoName The name of the repository to alter.
* @param newProps The new properties to apply to the repository.
* @throws DdlException if the repository does not exist, fails to apply properties, or cannot connect
* to the updated repository.
*/
public void alterRepository(String repoName, Map<String, String> newProps)
throws DdlException {
tryLock();
try {
Repository oldRepo = repoMgr.getRepo(repoName);
if (oldRepo == null) {
throw new DdlException("Repository does not exist");
}
// Merge new properties with the existing repository's properties
Map<String, String> mergedProps = mergeProperties(oldRepo, newProps);
// Create new remote file system with merged properties
RemoteFileSystem fileSystem = FileSystemFactory.get(StorageProperties.createPrimary(mergedProps));
// Create new Repository instance with updated file system
Repository newRepo = new Repository(
oldRepo.getId(), oldRepo.getName(), oldRepo.isReadOnly(),
oldRepo.getLocation(), fileSystem
);
// Verify the repository can be connected with new settings
if (!newRepo.ping()) {
LOG.warn("Failed to connect repository {}. msg: {}", repoName, newRepo.getErrorMsg());
throw new DdlException("Repository ping failed with new properties");
}
// Apply the new repository metadata
Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
if (!st.ok()) {
throw new DdlException("Failed to alter repository: " + st.getErrMsg());
}
// Update all running jobs that are using this repository
updateOngoingJobs(oldRepo.getId(), newRepo);
} finally {
seqlock.unlock();
}
}
/**
* Merges new user-provided properties into the existing repository's configuration.
*
* @param repo The existing repository.
* @param newProps New user-specified properties.
* @return A complete set of merged properties.
*/
private Map<String, String> mergeProperties(Repository repo, Map<String, String> newProps) {
// General case: just override old props with new ones
Map<String, String> combined = new HashMap<>(repo.getRemoteFileSystem().getProperties());
combined.putAll(newProps);
return combined;
}
/**
* Updates all currently running jobs associated with the given repository ID.
* Used to ensure that all jobs operate on the new repository instance after alteration.
*
* @param repoId The ID of the altered repository.
* @param newRepo The new repository instance.
*/
private void updateOngoingJobs(long repoId, Repository newRepo) {
// Get jobs to update based on concurrency mode
List<AbstractJob> jobsToUpdate;
if (Config.enable_table_level_backup_concurrency) {
jobsToUpdate = getAllRunningJobs();
} else {
jobsToUpdate = getAllCurrentJobs();
}
for (AbstractJob job : jobsToUpdate) {
if (!job.isDone() && job.getRepoId() == repoId) {
job.updateRepo(newRepo);
}
}
}
// handle drop repository command
public void dropRepository(String repoName) throws DdlException {
tryLock();
try {
Repository repo = repoMgr.getRepo(repoName);
if (repo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist");
}
// Get jobs to check based on concurrency mode
List<AbstractJob> jobsToCheck;
if (Config.enable_table_level_backup_concurrency) {
jobsToCheck = getAllRunningJobs();
} else {
jobsToCheck = getAllCurrentJobs();
}
for (AbstractJob job : jobsToCheck) {
if (!job.isDone() && job.getRepoId() == repo.getId()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Backup or restore job is running on this repository."
+ " Can not drop it");
}
}
Status st = repoMgr.removeRepo(repo.getName(), false /* not replay */);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to drop repository: " + st.getErrMsg());
}
} finally {
seqlock.unlock();
}
}
public void process(BackupCommand command) throws DdlException {
if (Config.isCloudMode()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"BACKUP are not supported by the cloud mode yet");
}
// check if repo exist
String repoName = command.getRepoName();
Repository repository = null;
if (!repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
repository = repoMgr.getRepo(repoName);
if (repository == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repository " + repoName + " does not exist");
}
}
// check if db exist
String dbName = command.getDbName();
Database db = env.getInternalCatalog().getDbOrDdlException(dbName);
// Try to get sequence lock.
// We expect at most one operation on a repo at same time.
// But this operation may take a few seconds with lock held.
// So we use tryLock() to give up this operation if we can not get lock.
tryLock();
try {
// === Concurrency control ===
if (!Config.enable_table_level_backup_concurrency) {
// Original logic: only one job at a time
AbstractJob currentJob = getCurrentJob(db.getId());
if (currentJob != null && !currentJob.isDone()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can only run one backup or restore job of a database at same time "
+ ", current running: label = " + currentJob.getLabel() + " jobId = "
+ currentJob.getJobId() + ", to run label = " + command.getLabel());
}
}
// Note: For concurrent mode, actual concurrency checks are performed in backup() method
// after the job is created, because we need the job ID to add to allowedJobIds
backup(repository, db, command);
} finally {
seqlock.unlock();
}
}
public void process(RestoreCommand command) throws DdlException {
if (Config.isCloudMode() && !Config.enable_cloud_restore_job) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Restore is an experimental feature in cloud mode. Set config "
+ "`experimental_enable_cloud_restore_job` = `true` to enable.");
}
// check if repo exist
String repoName = command.getRepoName();
Repository repository = null;
if (!repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {
repository = repoMgr.getRepo(repoName);
if (repository == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Repository " + repoName + " does not exist");
}
}
// check if db exist
String dbName = command.getDbName();
Database db = env.getInternalCatalog().getDbOrDdlException(dbName);
// Try to get sequence lock.
// We expect at most one operation on a repo at same time.
// But this operation may take a few seconds with lock held.
// So we use tryLock() to give up this operation if we can not get lock.
tryLock();
try {
// === Concurrency control ===
if (!Config.enable_table_level_backup_concurrency) {
// Original logic: only one job at a time
AbstractJob currentJob = getCurrentJob(db.getId());
if (currentJob != null && !currentJob.isDone()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can only run one backup or restore job of a database at same time "
+ ", current running: label = " + currentJob.getLabel() + " jobId = "
+ currentJob.getJobId() + ", to run label = " + command.getLabel());
}
}
// Note: For concurrent mode, checkConcurrency is called in restore() method
// after jobInfo is available, because we need to know the actual tables
restore(repository, db, command);
} finally {
seqlock.unlock();
}
}
private void tryLock() throws DdlException {
try {
if (!seqlock.tryLock(10, TimeUnit.SECONDS)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Another backup or restore job"
+ " is being submitted. Please wait and try again");
}
} catch (InterruptedException e) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Got interrupted exception when "
+ "try locking. Try again");
}
}
private void backup(Repository repository, Database db, BackupCommand command) throws DdlException {
if (repository != null && repository.isReadOnly()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository " + repository.getName()
+ " is read only");
}
long commitSeq = 0;
Set<String> tableNames = Sets.newHashSet();
List<TableRefInfo> tableRefInfos = command.getTableRefInfos();
// Obtain the snapshot commit seq, any creating table binlog will be visible.
db.readLock();
try {
BarrierLog log = new BarrierLog(db.getId(), db.getFullName());
commitSeq = env.getEditLog().logBarrier(log);
// Determine the tables to be backed up
if (tableRefInfos.isEmpty()) {
tableNames = db.getTableNames();
} else if (command.isExclude()) {
tableNames = db.getTableNames();
for (TableRefInfo tableRefInfo : tableRefInfos) {
if (!tableNames.remove(tableRefInfo.getTableNameInfo().getTbl())) {
LOG.info("exclude table " + tableRefInfo.getTableNameInfo().getTbl()
+ " of backup stmt is not exists in db " + db.getFullName());
}
}
}
} finally {
db.readUnlock();
}
List<TableRefInfo> tableRefInfoList = Lists.newArrayList();
if (!tableRefInfos.isEmpty() && !command.isExclude()) {
tableRefInfoList.addAll(tableRefInfos);
} else {
for (String tableName : tableNames) {
TableRefInfo tableRefInfo = new TableRefInfo(new TableNameInfo(db.getFullName(), tableName),
null,
null,
null,
new ArrayList<>(),
null,
null,
new ArrayList<>());
tableRefInfoList.add(tableRefInfo);
}
}
// Check if backup objects are valid
// This is just a pre-check to avoid most of invalid backup requests.
// Also calculate the signature for incremental backup check.
List<TableRefInfo> tblRefInfosNotSupport = Lists.newArrayList();
for (TableRefInfo tableRef : tableRefInfoList) {
String tblName = tableRef.getTableNameInfo().getTbl();
Table tbl = db.getTableOrDdlException(tblName);
// filter the table types which are not supported by local backup.
if (repository == null && tbl.getType() != TableType.OLAP
&& tbl.getType() != TableType.VIEW && tbl.getType() != TableType.MATERIALIZED_VIEW) {
tblRefInfosNotSupport.add(tableRef);
continue;
}
if (tbl.getType() == TableType.VIEW || tbl.getType() == TableType.ODBC
|| tbl.getType() == TableType.MATERIALIZED_VIEW) {
continue;
}
if (tbl.getType() != TableType.OLAP) {
if (Config.ignore_backup_not_support_table_type) {
LOG.warn("Table '{}' is a {} table, can not backup and ignore it."
+ "Only OLAP(Doris)/ODBC/VIEW table can be backed up",
tblName, tbl.getType().toString());
tblRefInfosNotSupport.add(tableRef);
continue;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_NOT_OLAP_TABLE, tblName);
}
}
if (tbl.isTemporary()) {
if (Config.ignore_backup_not_support_table_type || tableRefInfoList.size() > 1) {
LOG.warn("Table '{}' is a temporary table, can not backup and ignore it."
+ "Only OLAP(Doris)/ODBC/VIEW table can be backed up",
Util.getTempTableDisplayName(tblName));
tblRefInfosNotSupport.add(tableRef);
continue;
} else {
ErrorReport.reportDdlException("Table " + Util.getTempTableDisplayName(tblName)
+ " is a temporary table, do not support backup");
}
}
OlapTable olapTbl = (OlapTable) tbl;
tbl.readLock();
try {
if (!Config.ignore_backup_tmp_partitions && olapTbl.existTempPartitions()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support backup table " + olapTbl.getName() + " with temp partitions");
}
PartitionNamesInfo partitionNames = tableRef.getPartitionNamesInfo();
if (partitionNames != null) {
if (!Config.ignore_backup_tmp_partitions && partitionNames.isTemp()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support backup temp partitions in table "
+ tableRef.getTableNameInfo().getTbl());
}
for (String partName : partitionNames.getPartitionNames()) {
Partition partition = olapTbl.getPartition(partName);
if (partition == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Unknown partition " + partName + " in table" + tblName);
}
}
}
} finally {
tbl.readUnlock();
}
}
tableRefInfoList.removeAll(tblRefInfosNotSupport);
// Check if label already be used
long repoId = Repository.KEEP_ON_LOCAL_REPO_ID;
if (repository != null) {
List<String> existSnapshotNames = Lists.newArrayList();
Status st = repository.listSnapshots(existSnapshotNames);
if (!st.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, st.getErrMsg());
}
if (existSnapshotNames.contains(command.getLabel())) {
if (command.getBackupType() == BackupCommand.BackupType.FULL) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Snapshot with name '"
+ command.getLabel() + "' already exist in repository");
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
+ "incremental backup");
}
}
repoId = repository.getId();
}
// Create a backup job
BackupJob backupJob = new BackupJob(command.getLabel(), db.getId(),
db.getFullName(),
tableRefInfoList, command.getTimeoutMs(), command.getContent(), env, repoId, commitSeq);
// === Concurrency control ===
if (Config.enable_table_level_backup_concurrency) {
if (backupJob.getJobId() == -1) {
backupJob.setJobId(env.getNextId());
}
// Determine if this is a full database backup
boolean isFullDatabase = tableRefInfoList.isEmpty()
|| (command.getTableRefInfos().isEmpty() && !command.isExclude());
// Collect table names for concurrency check
List<String> tableNameList = new ArrayList<>();
for (TableRefInfo tblRef : tableRefInfoList) {
tableNameList.add(tblRef.getTableNameInfo().getTbl());
}
// Check concurrency (throws exception for hard rejection cases)
checkConcurrency(db.getId(), command.getLabel(), true, isFullDatabase, tableNameList);
// Update statistics
onJobCreated(db.getId(), backupJob, true, isFullDatabase);
// Decide whether to add to allowedJobIds (canActivate checks running jobs)
if (canActivate(db.getId(), backupJob, true, isFullDatabase)) {
allowedJobIds.add(backupJob.getJobId());
onJobActivated(db.getId(), backupJob, true, isFullDatabase);
LOG.info("Backup job [{}] can execute immediately (jobId={})",
backupJob.getLabel(), backupJob.getJobId());
} else {
LOG.info("Backup job [{}] will be pending (jobId={})",
backupJob.getLabel(), backupJob.getJobId());
}
}
// write log (after jobId is assigned so EditLog records the correct jobId)
env.getEditLog().logBackupJob(backupJob);
// Add to appropriate queue based on concurrency mode
if (Config.enable_table_level_backup_concurrency) {
addActiveJob(db.getId(), backupJob);
} else {
addBackupOrRestoreJob(db.getId(), backupJob);
}
LOG.info("finished to submit backup job: {}", backupJob);
}
public void restore(Repository repository, Database db, RestoreCommand command) throws DdlException {
BackupJobInfo jobInfo;
if ((command.isLocal() || command.isAtomicRestore() || command.reserveColocate() || command.isForceReplace())
&& Config.isCloudMode()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "not supported now.");
}
if (command.isLocal()) {
jobInfo = command.getJobInfo();
if (jobInfo.extraInfo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info empty");
}
if (jobInfo.extraInfo.beNetworkMap == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info be network map");
}
if (Strings.isNullOrEmpty(jobInfo.extraInfo.token)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Invalid job extra info token");
}
} else {
// Check if snapshot exist in repository
List<BackupJobInfo> infos = Lists.newArrayList();
Status status = repository.getSnapshotInfoFile(command.getLabel(), command.getBackupTimestamp(), infos);
if (!status.ok()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to get info of snapshot '" + command.getLabel() + "' because: "
+ status.getErrMsg() + ". Maybe specified wrong backup timestamp");
}
// Check if all restore objects are exist in this snapshot.
// Also remove all unrelated objs
Preconditions.checkState(infos.size() == 1);
jobInfo = infos.get(0);
}
checkAndFilterRestoreObjsExistInSnapshot(jobInfo, command);
// Create a restore job
RestoreJob restoreJob;
if (command.isLocal()) {
int metaVersion = command.getMetaVersion();
if (metaVersion == -1) {
metaVersion = jobInfo.metaVersion;
}
BackupMeta backupMeta = command.getMeta();
String backupTimestamp = TimeUtils.longToTimeString(
jobInfo.getBackupTime(), TimeUtils.getDatetimeFormatWithHyphenWithTimeZone());
restoreJob = new RestoreJob(command.getLabel(), backupTimestamp,
db.getId(), db.getFullName(), jobInfo, command.allowLoad(), command.getReplicaAlloc(),
command.getTimeoutMs(), command.getMetaVersion(), command.reserveReplica(), command.reserveColocate(),
command.reserveDynamicPartitionEnable(), command.isBeingSynced(), command.isCleanTables(),
command.isCleanPartitions(), command.isAtomicRestore(), command.isForceReplace(),
env, Repository.KEEP_ON_LOCAL_REPO_ID, backupMeta);
} else {
if (Config.isCloudMode()) {
restoreJob = new CloudRestoreJob(command.getLabel(), command.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, command.allowLoad(), command.getReplicaAlloc(),
command.getTimeoutMs(), command.getMetaVersion(), command.reserveReplica(),
command.reserveDynamicPartitionEnable(), command.isBeingSynced(), command.isCleanTables(),
command.isCleanPartitions(), command.isAtomicRestore(), command.isForceReplace(),
env, repository.getId(), command.getStorageVaultName());
} else {
restoreJob = new RestoreJob(command.getLabel(), command.getBackupTimestamp(),
db.getId(), db.getFullName(), jobInfo, command.allowLoad(), command.getReplicaAlloc(),
command.getTimeoutMs(), command.getMetaVersion(), command.reserveReplica(),
command.reserveColocate(), command.reserveDynamicPartitionEnable(), command.isBeingSynced(),
command.isCleanTables(), command.isCleanPartitions(), command.isAtomicRestore(),
command.isForceReplace(), env, repository.getId());
}
}
// Set tableRefs for concurrency control
restoreJob.setTableRefs(command.getTableRefInfos());
// === Concurrency control ===
if (Config.enable_table_level_backup_concurrency) {
if (restoreJob.getJobId() == -1) {
restoreJob.setJobId(env.getNextId());
}
// Determine if this is a full database restore
boolean isFullDatabase = command.getTableRefInfos().isEmpty();
// Collect table names for concurrency check
List<String> tableNameList = new ArrayList<>();
for (TableRefInfo tblRef : command.getTableRefInfos()) {
tableNameList.add(tblRef.getTableNameInfo().getTbl());
}
// Check concurrency (throws exception for hard rejection cases)
checkConcurrency(db.getId(), command.getLabel(), false, isFullDatabase, tableNameList);
// Update statistics
onJobCreated(db.getId(), restoreJob, false, isFullDatabase);
// Decide whether to add to allowedJobIds (canActivate checks running jobs)
if (canActivate(db.getId(), restoreJob, false, isFullDatabase)) {
allowedJobIds.add(restoreJob.getJobId());
onJobActivated(db.getId(), restoreJob, false, isFullDatabase);
// CRITICAL: Add restoring tables atomically with activation
// to prevent TOCTOU race condition where another restore job
// targeting the same table could also pass canActivate()
addRestoringTables(restoreJob);
LOG.info("Restore job [{}] can execute immediately (jobId={})",
restoreJob.getLabel(), restoreJob.getJobId());
} else {
LOG.info("Restore job [{}] will be pending (jobId={})",
restoreJob.getLabel(), restoreJob.getJobId());
}
}
// write log (after jobId is assigned so EditLog records the correct jobId)
env.getEditLog().logRestoreJob(restoreJob);
// Add to appropriate queue based on concurrency mode
if (Config.enable_table_level_backup_concurrency) {
addActiveJob(db.getId(), restoreJob);
} else {
addBackupOrRestoreJob(db.getId(), restoreJob);
}
LOG.info("finished to submit restore job: {}", restoreJob);
}
/**
* Add a job to the running queue (no size limit).
* This method is used for active jobs that are either PENDING or executing.
*
* @param dbId Database ID
* @param job The job to add
* @throws DdlException if the running queue soft limit is exceeded
*/
private void addActiveJob(long dbId, AbstractJob job) throws DdlException {
runningJobLock.lock();
try {
Deque<AbstractJob> queue = dbIdToRunningJobs.computeIfAbsent(dbId, k -> new LinkedList<>());
// Check soft limit (warning only)
if (queue.size() >= Config.max_backup_restore_running_queue_soft_limit) {
LOG.warn("Running queue for database {} has reached soft limit: {} jobs (limit: {})",
dbId, queue.size(), Config.max_backup_restore_running_queue_soft_limit);
}
// Check hard limit (reject)
if (queue.size() >= Config.max_backup_restore_running_queue_hard_limit) {
throw new DdlException(String.format(
"Running queue is full (%d jobs, hard limit: %d). "
+ "Too many active backup/restore jobs for database %d. "
+ "Please wait for some jobs to complete or increase "
+ "max_backup_restore_running_queue_hard_limit.",
queue.size(), Config.max_backup_restore_running_queue_hard_limit, dbId));
}
// Check if job already exists in queue (skip duplicate)
// This can happen during FE restart replay
boolean exists = queue.stream().anyMatch(j -> j.getJobId() == job.getJobId());
if (exists) {
LOG.info("Job {} already exists in running queue, skipping duplicate (dbId={})",
job.getLabel(), dbId);
return;
}
queue.addLast(job);
if (Config.enable_backup_concurrency_logging) {
LOG.info("Added job {} to running queue, dbId={}, queueSize={}",
job.getLabel(), dbId, queue.size());
}
} finally {
runningJobLock.unlock();
}
}
/**
* Move a completed job from running queue to history queue.
* History queue has FIFO cleanup when it reaches the size limit.
*
* @param dbId Database ID
* @param job The completed job
*/
private void moveToHistory(long dbId, AbstractJob job) {
// Step 1: Remove from running queue
removeFromRunningQueue(dbId, job);
// Step 2: Add to history queue with FIFO cleanup
addToHistoryQueue(dbId, job);
}
/**
* Remove a job from the running queue.
*
* @param dbId Database ID
* @param job The job to remove
*/
private void removeFromRunningQueue(long dbId, AbstractJob job) {
runningJobLock.lock();
try {
Deque<AbstractJob> queue = dbIdToRunningJobs.get(dbId);
if (queue != null) {
// Need to iterate to find and remove (job might be in the middle of queue)
Iterator<AbstractJob> it = queue.iterator();
while (it.hasNext()) {
if (it.next().getJobId() == job.getJobId()) {
it.remove();
if (Config.enable_backup_concurrency_logging) {
LOG.info("Removed job {} from running queue, dbId={}, remainingSize={}",
job.getLabel(), dbId, queue.size());
}
break;
}
}
// If running queue is empty, remove the key
if (queue.isEmpty()) {
dbIdToRunningJobs.remove(dbId);
}
}
} finally {
runningJobLock.unlock();
}
}
/**
* Add a job to the history queue with FIFO cleanup when size limit is reached.
*
* @param dbId Database ID
* @param job The finished job to add
*/
private void addToHistoryQueue(long dbId, AbstractJob job) {
historyJobLock.lock();
try {
Deque<AbstractJob> queue = dbIdToHistoryJobs.computeIfAbsent(dbId, k -> new LinkedList<>());
// FIFO cleanup: remove oldest if queue is full
while (queue.size() >= Config.max_backup_restore_job_num_per_db) {
AbstractJob removed = queue.removeFirst();
cleanupJobResources(removed);
if (Config.enable_backup_concurrency_logging) {
LOG.info("Removed oldest history job {} from queue (limit reached), dbId={}",
removed.getLabel(), dbId);
}
}
queue.addLast(job);
// Save snapshot to local repo if applicable
if (job instanceof BackupJob) {
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
addSnapshot(backupJob.getLabel(), backupJob);
}
}
if (Config.enable_backup_concurrency_logging) {
LOG.info("Added job {} to history queue, dbId={}, queueSize={}",
job.getLabel(), dbId, queue.size());
}
} finally {
historyJobLock.unlock();
}
}
/**
* Clean up resources associated with a job being removed from history.
*
* @param job The job to clean up
*/
private void cleanupJobResources(AbstractJob job) {
// Remove from local snapshots if it's a local backup
if (job instanceof BackupJob) {
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
removeSnapshot(backupJob.getLabel());
}
}
// Additional cleanup can be added here if needed
}
/**
* Get all jobs for a database (merging running and history queues).
* Used for SHOW BACKUP/RESTORE commands.
*
* @param dbId Database ID
* @return List of all jobs (running + history)
*/
private List<AbstractJob> getAllJobs(long dbId) {
List<AbstractJob> result = new ArrayList<>();
// Add jobs from running queue
runningJobLock.lock();
try {
Deque<AbstractJob> running = dbIdToRunningJobs.get(dbId);
if (running != null) {
result.addAll(running);
}
} finally {
runningJobLock.unlock();
}
// Add jobs from history queue
historyJobLock.lock();
try {
Deque<AbstractJob> history = dbIdToHistoryJobs.get(dbId);
if (history != null) {
result.addAll(history);
}
} finally {
historyJobLock.unlock();
}
return result;
}
/**
* Get all running jobs across all databases.
* Used by the scheduler in runAfterCatalogReady().
*
* @return List of all running jobs
*/
private List<AbstractJob> getAllRunningJobs() {
runningJobLock.lock();
try {
List<AbstractJob> result = new ArrayList<>();
for (Deque<AbstractJob> queue : dbIdToRunningJobs.values()) {
result.addAll(queue);
}
return result;
} finally {
runningJobLock.unlock();
}
}
private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
// If there are too many backup/restore jobs, it may cause OOM. If the job num option is set to 0,
// skip all backup/restore jobs.
if (Config.max_backup_restore_job_num_per_db <= 0) {
return;
}
List<String> removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
AbstractJob removedJob = jobs.removeFirst();
if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) {
removedLabels.add(removedJob.getLabel());
}
}
AbstractJob lastJob = jobs.peekLast();
// Remove duplicate jobs and keep only the latest status
// Otherwise, the tasks that have been successfully executed will be repeated when replaying edit log.
if (lastJob != null && (lastJob.isPending() || lastJob.getJobId() == job.getJobId())) {
jobs.removeLast();
}
jobs.addLast(job);
} finally {
jobLock.unlock();
}
if (job.isFinished() && job instanceof BackupJob) {
// Save snapshot to local repo, when reload backupHandler from image.
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
addSnapshot(backupJob.getLabel(), backupJob);
}
}
for (String label : removedLabels) {
removeSnapshot(label);
}
}
private List<AbstractJob> getAllCurrentJobs() {
jobLock.lock();
try {
return dbIdToBackupOrRestoreJobs.values().stream().filter(CollectionUtils::isNotEmpty)
.map(Deque::getLast).collect(Collectors.toList());
} finally {
jobLock.unlock();
}
}
private AbstractJob getCurrentJob(long dbId) {
if (Config.enable_table_level_backup_concurrency) {
// In dual queue mode, get the last job from running queue
runningJobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(dbId);
return (jobs != null && !jobs.isEmpty()) ? jobs.getLast() : null;
} finally {
runningJobLock.unlock();
}
} else {
// Legacy mode: use old single queue
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.getOrDefault(dbId, Lists.newLinkedList());
return jobs.isEmpty() ? null : jobs.getLast();
} finally {
jobLock.unlock();
}
}
}
private void checkAndFilterRestoreObjsExistInSnapshot(BackupJobInfo jobInfo,
RestoreCommand command)
throws DdlException {
// case1: all table in job info
if (command.getTableRefInfos().isEmpty()) {
return;
}
// case2: exclude table ref
if (command.isExclude()) {
for (TableRefInfo tableRefInfo : command.getTableRefInfos()) {
String tblName = tableRefInfo.getTableNameInfo().getTbl();
TableType tableType = jobInfo.getTypeByTblName(tblName);
if (tableType == null) {
LOG.info("Ignore error : exclude table " + tblName + " does not exist in snapshot "
+ jobInfo.name);
continue;
}
if (tableRefInfo.hasAlias()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"The table alias in exclude clause does not make sense");
}
jobInfo.removeTable(tableRefInfo, tableType);
}
return;
}
// case3: include table ref
Set<String> olapTableNames = Sets.newHashSet();
Set<String> viewNames = Sets.newHashSet();
Set<String> odbcTableNames = Sets.newHashSet();
for (TableRefInfo tableRefInfo : command.getTableRefInfos()) {
String tblName = tableRefInfo.getTableNameInfo().getTbl();
TableType tableType = jobInfo.getTypeByTblName(tblName);
if (tableType == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Table " + tblName + " does not exist in snapshot " + jobInfo.name);
}
switch (tableType) {
case OLAP:
checkAndFilterRestoreOlapTableExistInSnapshot(jobInfo.backupOlapTableObjects, tableRefInfo);
olapTableNames.add(tblName);
break;
case VIEW:
viewNames.add(tblName);
break;
case ODBC:
odbcTableNames.add(tblName);
break;
default:
break;
}
// set alias
if (tableRefInfo.hasAlias()) {
jobInfo.setAlias(tblName, tableRefInfo.getTableAlias());
}
}
jobInfo.retainOlapTables(olapTableNames);
jobInfo.retainView(viewNames);
jobInfo.retainOdbcTables(odbcTableNames);
}
public void checkAndFilterRestoreOlapTableExistInSnapshot(Map<String, BackupOlapTableInfo> backupOlapTableInfoMap,
TableRefInfo tableRefInfo) throws DdlException {
String tblName = tableRefInfo.getTableNameInfo().getTbl();
BackupOlapTableInfo tblInfo = backupOlapTableInfoMap.get(tblName);
PartitionNamesInfo partitionNamesInfo = tableRefInfo.getPartitionNamesInfo();
if (partitionNamesInfo != null) {
if (partitionNamesInfo.isTemp()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support restoring temporary partitions in table " + tblName);
}
// check the selected partitions
for (String partName : partitionNamesInfo.getPartitionNames()) {
if (!tblInfo.containsPart(partName)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Partition " + partName + " of table " + tblName
+ " does not exist in snapshot");
}
}
}
// only retain restore partitions
tblInfo.retainPartitions(partitionNamesInfo == null ? null : partitionNamesInfo.getPartitionNames());
}
public void cancel(CancelBackupCommand command) throws DdlException {
String dbName = command.getDbName();
String labelFilter = command.getLabel(); // Get label filter
Database db = env.getInternalCatalog().getDbOrDdlException(dbName);
List<AbstractJob> jobsToCancel = new ArrayList<>();
// In concurrency mode, find all running jobs that match the criteria
if (Config.enable_table_level_backup_concurrency) {
runningJobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(db.getId());
if (jobs != null) {
for (AbstractJob job : jobs) {
// Check if job type matches
boolean typeMatch = (job instanceof BackupJob && !command.isRestore())
|| (job instanceof RestoreJob && command.isRestore());
// Check if label matches (using CancelBackupCommand.matchesLabel())
boolean labelMatch = command.matchesLabel(job.getLabel());
if (typeMatch && labelMatch && !job.isDone()) {
jobsToCancel.add(job);
}
}
}
} finally {
runningJobLock.unlock();
}
} else {
// Legacy mode: use getCurrentJob()
AbstractJob job = getCurrentJob(db.getId());
if (job != null) {
boolean typeMatch = (job instanceof BackupJob && !command.isRestore())
|| (job instanceof RestoreJob && command.isRestore());
boolean labelMatch = command.matchesLabel(job.getLabel());
if (typeMatch && labelMatch) {
jobsToCancel.add(job);
}
}
}
// Error message based on whether label was specified
if (jobsToCancel.isEmpty()) {
String errorMsg;
if (labelFilter != null) {
errorMsg = String.format("No %s job with label '%s' is currently running",
command.isRestore() ? "restore" : "backup",
labelFilter);
} else {
errorMsg = String.format("No %s job is currently running",
command.isRestore() ? "restore" : "backup");
}
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, errorMsg);
}
// Cancel all matching jobs
List<String> cancelledLabels = new ArrayList<>();
List<String> failedLabels = new ArrayList<>();
for (AbstractJob job : jobsToCancel) {
Status status = job.cancel();
if (status.ok()) {
cancelledLabels.add(job.getLabel());
} else {
failedLabels.add(job.getLabel() + "(" + status.getErrMsg() + ")");
LOG.warn("Failed to cancel {} job {}: {}",
command.isRestore() ? "restore" : "backup",
job.getLabel(), status.getErrMsg());
}
}
if (!failedLabels.isEmpty()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to cancel some jobs: " + String.join(", ", failedLabels));
}
// Log with more details
if (labelFilter != null) {
LOG.info("Cancelled {} {} job(s) matching label '{}': {}",
cancelledLabels.size(),
command.isRestore() ? "restore" : "backup",
labelFilter,
String.join(", ", cancelledLabels));
} else {
LOG.info("Cancelled {} {} job(s): {}",
cancelledLabels.size(),
command.isRestore() ? "restore" : "backup",
String.join(", ", cancelledLabels));
}
}
public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
AbstractJob job = null;
// In concurrency mode, we need to find the job by jobId across all running jobs
if (Config.enable_table_level_backup_concurrency) {
runningJobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(task.getDbId());
if (jobs != null) {
// Search for the job with matching jobId
for (AbstractJob j : jobs) {
if (j.getJobId() == task.getJobId()) {
job = j;
break;
}
}
}
} finally {
runningJobLock.unlock();
}
} else {
// Legacy mode: use getCurrentJob()
job = getCurrentJob(task.getDbId());
if (job != null && job.getJobId() != task.getJobId()) {
LOG.warn("invalid snapshot task: {}, job id: {}, task job id: {}",
task, job.getJobId(), task.getJobId());
return true;
}
}
if (job == null) {
LOG.warn("failed to find backup or restore job for task: {} (jobId={})", task, task.getJobId());
// return true to remove this task from AgentTaskQueue
return true;
}
if (job instanceof BackupJob) {
if (task.isRestoreTask()) {
LOG.warn("expect finding restore job, but get backup job {} for task: {}", job, task);
// return true to remove this task from AgentTaskQueue
return true;
}
return ((BackupJob) job).finishTabletSnapshotTask(task, request);
} else {
if (!task.isRestoreTask()) {
LOG.warn("expect finding backup job, but get restore job {} for task: {}", job, task);
// return true to remove this task from AgentTaskQueue
return true;
}
return ((RestoreJob) job).finishTabletSnapshotTask(task, request);
}
}
public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequest request) {
AbstractJob job = null;
// In concurrency mode, find the job by jobId across all running jobs
if (Config.enable_table_level_backup_concurrency) {
runningJobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(task.getDbId());
if (jobs != null) {
for (AbstractJob j : jobs) {
if (j.getJobId() == task.getJobId()) {
job = j;
break;
}
}
}
} finally {
runningJobLock.unlock();
}
} else {
job = getCurrentJob(task.getDbId());
}
if (job == null || (job instanceof RestoreJob)) {
LOG.info("invalid upload task: {}, no backup job is found. db id: {}", task, task.getDbId());
return false;
}
BackupJob backupJob = (BackupJob) job;
if (backupJob.getJobId() != task.getJobId() || backupJob.getState() != BackupJobState.UPLOADING) {
LOG.info("invalid upload task: {}, job id: {}, job state: {}",
task, backupJob.getJobId(), backupJob.getState().name());
return false;
}
return backupJob.finishSnapshotUploadTask(task, request);
}
public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest request) {
AbstractJob job = null;
// In concurrency mode, find the job by jobId across all running jobs
if (Config.enable_table_level_backup_concurrency) {
runningJobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(task.getDbId());
if (jobs != null) {
for (AbstractJob j : jobs) {
if (j.getJobId() == task.getJobId()) {
job = j;
break;
}
}
}
} finally {
runningJobLock.unlock();
}
} else {
job = getCurrentJob(task.getDbId());
if (job != null && job.getJobId() != task.getJobId()) {
LOG.warn("invalid download task: {}, job id: {}, task job id: {}",
task, job.getJobId(), task.getJobId());
return true;
}
}
if (!(job instanceof RestoreJob)) {
LOG.warn("failed to find restore job for task: {} (jobId={})", task, task.getJobId());
// return true to remove this task from AgentTaskQueue
return true;
}
return ((RestoreJob) job).finishTabletDownloadTask(task, request);
}
public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) {
AbstractJob job = null;
// In concurrency mode, find the job by jobId across all running jobs
if (Config.enable_table_level_backup_concurrency) {
runningJobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(task.getDbId());
if (jobs != null) {
for (AbstractJob j : jobs) {
if (j.getJobId() == task.getJobId()) {
job = j;
break;
}
}
}
} finally {
runningJobLock.unlock();
}
} else {
job = getCurrentJob(task.getDbId());
if (job != null && job.getJobId() != task.getJobId()) {
LOG.warn("invalid dir move task: {}, job id: {}, task job id: {}",
task, job.getJobId(), task.getJobId());
return true;
}
}
if (!(job instanceof RestoreJob)) {
LOG.warn("failed to find restore job for task: {} (jobId={})", task, task.getJobId());
// return true to remove this task from AgentTaskQueue
return true;
}
return ((RestoreJob) job).finishDirMoveTask(task, request);
}
public void replayAddJob(AbstractJob job) {
LOG.info("replay backup/restore job: {}", job);
if (job.isCancelled()) {
AbstractJob existingJob = getCurrentJob(job.getDbId());
if (existingJob == null || existingJob.isDone()) {
LOG.error("invalid existing job: {}. current replay job is: {}",
existingJob, job);
return;
}
existingJob.setEnv(env);
existingJob.replayCancel();
} else if (!job.isPending()) {
AbstractJob existingJob = getCurrentJob(job.getDbId());
if (existingJob == null || existingJob.isDone()) {
LOG.error("invalid existing job: {}. current replay job is: {}",
existingJob, job);
return;
}
// We use replayed job, not the existing job, to do the replayRun().
// Because if we use the existing job to run again,
// for example: In restore job, PENDING will transfer to SNAPSHOTING, not DOWNLOAD.
job.setEnv(env);
job.replayRun();
}
// Add to appropriate queue based on concurrency mode and job state
if (Config.enable_table_level_backup_concurrency) {
// In dual queue mode, decide based on job state
if (job.isDone()) {
// Completed job goes to history queue
addToHistoryQueue(job.getDbId(), job);
} else {
// Active job (PENDING or running) goes to running queue
try {
addActiveJob(job.getDbId(), job);
} catch (DdlException e) {
LOG.warn("Failed to add job {} to running queue during replay: {}",
job.getLabel(), e.getMessage());
}
}
} else {
// Legacy mode: use old single queue
addBackupOrRestoreJob(job.getDbId(), job);
}
}
public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, int totalNum) {
// Get jobs to report based on concurrency mode
List<AbstractJob> jobsToCheck;
if (Config.enable_table_level_backup_concurrency) {
jobsToCheck = getAllRunningJobs();
} else {
jobsToCheck = getAllCurrentJobs();
}
for (AbstractJob job : jobsToCheck) {
if (job.getType() == JobType.BACKUP) {
if (!job.isDone() && job.getJobId() == jobId && type == TTaskType.UPLOAD) {
job.taskProgress.put(taskId, Pair.of(finishedNum, totalNum));
return true;
}
} else if (job.getType() == JobType.RESTORE) {
if (!job.isDone() && job.getJobId() == jobId && type == TTaskType.DOWNLOAD) {
job.taskProgress.put(taskId, Pair.of(finishedNum, totalNum));
return true;
}
}
}
return false;
}
public void addSnapshot(String labelName, BackupJob backupJob) {
assert backupJob.isFinished();
LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.put(labelName, backupJob);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}
public void removeSnapshot(String labelName) {
LOG.info("remove snapshot {} from local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}
public Snapshot getSnapshot(String labelName) {
BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}
if (backupJob == null) {
return null;
}
return backupJob.getSnapshot();
}
@Override
public void write(DataOutput out) throws IOException {
repoMgr.write(out);
// Collect jobs based on concurrency mode
List<AbstractJob> jobs;
if (Config.enable_table_level_backup_concurrency) {
// In dual queue mode, merge running and history queues
jobs = new ArrayList<>();
runningJobLock.lock();
try {
for (Deque<AbstractJob> queue : dbIdToRunningJobs.values()) {
jobs.addAll(queue);
}
} finally {
runningJobLock.unlock();
}
historyJobLock.lock();
try {
for (Deque<AbstractJob> queue : dbIdToHistoryJobs.values()) {
jobs.addAll(queue);
}
} finally {
historyJobLock.unlock();
}
} else {
// Legacy mode: use old single queue
jobs = dbIdToBackupOrRestoreJobs.values()
.stream().flatMap(Deque::stream).collect(Collectors.toList());
}
out.writeInt(jobs.size());
for (AbstractJob job : jobs) {
job.write(out);
}
}
public void readFields(DataInput in) throws IOException {
repoMgr = RepositoryMgr.read(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
AbstractJob job = AbstractJob.read(in);
// Note: addBackupOrRestoreJob will be replaced by replayAddJob during normal operation
// This is just for initial loading, and the actual queue assignment will happen in
// rebuildConcurrencyStateAfterRestart()
addBackupOrRestoreJob(job.getDbId(), job);
}
}
// === Concurrency Control Methods ===
/**
* Check if a job has execution permission.
* Called by BackupJob/RestoreJob to determine if they can proceed with execution.
*
* @param jobId the job ID to check
* @return true if the job is allowed to execute, false otherwise
*/
public boolean canExecute(long jobId) {
if (!Config.enable_table_level_backup_concurrency) {
return true; // Concurrency not enabled, all jobs can execute
}
return allowedJobIds.contains(jobId);
}
/**
* Called when a job is activated (added to allowedJobIds) to update running counters.
* This enables O(1) canActivate checks.
*
* @param dbId database ID
* @param job the activated job
* @param isBackup true for backup jobs, false for restore jobs
* @param isFullDatabase true if this is a full database operation
*/
private void onJobActivated(long dbId, AbstractJob job, boolean isBackup, boolean isFullDatabase) {
DatabaseJobStats stats = dbJobStats.computeIfAbsent(dbId, k -> new DatabaseJobStats());
if (isBackup) {
stats.runningBackups++;
if (isFullDatabase) {
stats.runningBackupDatabaseJobId = job.getJobId();
}
} else {
stats.runningRestores++;
}
if (Config.enable_backup_concurrency_logging) {
LOG.info("Job activated: dbId={}, label={}, runningBackups={}, runningRestores={}",
dbId, job.getLabel(), stats.runningBackups, stats.runningRestores);
}
}
/**
* Called when a job is deactivated (removed from allowedJobIds) to update running counters.
*
* @param dbId database ID
* @param job the deactivated job
* @param isBackup true for backup jobs, false for restore jobs
*/
private void onJobDeactivated(long dbId, AbstractJob job, boolean isBackup) {
DatabaseJobStats stats = dbJobStats.get(dbId);
if (stats == null) {
return;
}
if (isBackup) {
stats.runningBackups--;
if (Long.valueOf(job.getJobId()).equals(stats.runningBackupDatabaseJobId)) {
stats.runningBackupDatabaseJobId = null;
}
} else {
stats.runningRestores--;
}
int taskCount = job.getSnapshotTaskCount();
if (taskCount > 0) {
globalSnapshotTasks.addAndGet(-taskCount);
}
if (Config.enable_backup_concurrency_logging) {
LOG.info("Job deactivated: dbId={}, label={}, runningBackups={}, runningRestores={}, "
+ "globalSnapshotTasks={}",
dbId, job.getLabel(), stats.runningBackups, stats.runningRestores,
globalSnapshotTasks.get());
}
}
/**
* Concurrency check for new backup/restore job submissions.
* This method only performs hard rejection checks. Soft checks (whether to PENDING)
* are handled by canActivate().
*
* @param dbId database ID
* @param label job label
* @param isBackup true for backup jobs, false for restore jobs
* @param isFullDatabase true if this is a full database backup/restore (no specific tables)
* @param tableNames list of table names involved (empty for full database operations)
* @throws DdlException if job should be rejected
*/
private void checkConcurrency(long dbId, String label, boolean isBackup,
boolean isFullDatabase, List<String> tableNames) throws DdlException {
// === Step 1: O(1) Label duplicate check ===
Map<String, Long> labels = labelIndex.get(dbId);
if (labels != null && labels.containsKey(label)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Label already exists: " + label);
}
// === Step 2: O(1) Get statistics ===
DatabaseJobStats stats = dbJobStats.get(dbId);
if (stats == null) {
return; // No jobs, no rejection needed
}
int totalActive = stats.activeBackups + stats.activeRestores;
// === Rule 1: Concurrency limit (hard reject) ===
if (totalActive >= Config.max_backup_restore_concurrent_num_per_db) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Concurrency limit reached (" + totalActive + " active jobs).\n"
+ "Active jobs: " + String.join(", ", stats.activeLabels) + "\n"
+ "Limit: " + Config.max_backup_restore_concurrent_num_per_db);
}
// === Rule 2-A: Queue has backup_database -> reject all new backups ===
if (isBackup && stats.backupDatabaseJobId != null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Cannot submit backup job while full database backup exists.\n"
+ "Full database backup: " + stats.backupDatabaseLabel + " (running or pending)\n"
+ "Suggestion: Wait for it to complete, or cancel it first.");
}
// Other rules (backup_database waiting, restore conflicts, backup/restore mutual exclusion)
// are soft rules handled by canActivate() - they cause PENDING, not rejection
}
/**
* Called when a job is created to update statistics.
*
* @param dbId database ID
* @param job the created job
* @param isBackup true for backup jobs, false for restore jobs
* @param isFullDatabase true if this is a full database operation
*/
private void onJobCreated(long dbId, AbstractJob job, boolean isBackup, boolean isFullDatabase) {
// Update statistics
DatabaseJobStats stats = dbJobStats.computeIfAbsent(dbId, k -> new DatabaseJobStats());
if (isBackup) {
stats.activeBackups++;
if (isFullDatabase) {
stats.backupDatabaseJobId = job.getJobId();
stats.backupDatabaseLabel = job.getLabel();
}
} else {
stats.activeRestores++;
}
stats.activeLabels.add(job.getLabel());
// Update label index
labelIndex.computeIfAbsent(dbId, k -> new ConcurrentHashMap<>())
.put(job.getLabel(), job.getJobId());
if (Config.enable_backup_concurrency_logging) {
LOG.info("Job created: dbId={}, label={}, backups={}, restores={}",
dbId, job.getLabel(), stats.activeBackups, stats.activeRestores);
}
}
/**
* Check if a newly created job can be activated immediately (O(1) version).
* This is called right after job creation to decide whether to add it to allowedJobIds.
*
* @param dbId database ID
* @param job the job to check
* @param isBackup true for backup jobs, false for restore jobs
* @param isFullDatabase true if this is a full database operation
* @return true if job can be activated immediately
*/
private boolean canActivate(long dbId, AbstractJob job, boolean isBackup, boolean isFullDatabase) {
DatabaseJobStats stats = dbJobStats.get(dbId);
// O(1) lookup for running job counts
int runningBackups = (stats != null) ? stats.runningBackups : 0;
int runningRestores = (stats != null) ? stats.runningRestores : 0;
boolean hasRunningBackupDatabase = (stats != null) && (stats.runningBackupDatabaseJobId != null);
// === Rule 1: Concurrency limit ===
if (runningBackups + runningRestores >= Config.max_backup_restore_concurrent_num_per_db) {
return false;
}
// === Rule 2-A: Running backup_database -> backup cannot activate ===
if (hasRunningBackupDatabase && isBackup) {
return false;
}
// === Rule 2-B: Job is backup_database -> wait for all backups to complete ===
if (isBackup && isFullDatabase && runningBackups > 0) {
return false;
}
// === Rule 3: Backup/Restore mutual exclusion ===
if (isBackup && runningRestores > 0) {
return false;
}
if (!isBackup && runningBackups > 0) {
return false;
}
// === Rule 4: For restore jobs, check table conflicts ===
if (!isBackup && job instanceof RestoreJob) {
RestoreJob restoreJob = (RestoreJob) job;
// Check full database restore conflict
if (stats != null && stats.restoreDatabaseJobId != null) {
return false; // Full database restore is running
}
// Check if this is a full database restore
if (isFullDatabase) {
Set<String> currentRestoring = restoringTables.get(dbId);
if (currentRestoring != null && !currentRestoring.isEmpty()) {
return false; // Other tables are being restored
}
} else {
// Check table conflicts (O(k) where k = number of tables in the job)
Set<String> currentRestoring = restoringTables.get(dbId);
if (currentRestoring != null && !currentRestoring.isEmpty()) {
for (TableRefInfo tblRef : restoreJob.getTableRefs()) {
String tableName = tblRef.getTableNameInfo().getTbl();
if (currentRestoring.contains(tableName)) {
return false; // Table conflict
}
}
}
}
}
return true;
}
/**
* Check if a PENDING job can be activated (O(1) version).
* Called during tryActivatePendingJobs() to determine which jobs can start running.
*
* @param dbId database ID
* @param jobToCheck the job to check for activation
* @param isBackup true for backup jobs, false for restore jobs
* @return true if job can be activated
*/
private boolean canActivateJob(long dbId, AbstractJob jobToCheck, boolean isBackup) {
DatabaseJobStats stats = dbJobStats.get(dbId);
// Determine job type
boolean isJobBackupDatabase = false;
boolean isJobRestore = false;
if (isBackup && jobToCheck instanceof BackupJob) {
BackupJob bj = (BackupJob) jobToCheck;
isJobBackupDatabase = bj.getTableRefs().isEmpty();
} else {
isJobRestore = true;
}
// O(1) lookup for running job counts
int runningBackups = (stats != null) ? stats.runningBackups : 0;
int runningRestores = (stats != null) ? stats.runningRestores : 0;
boolean hasRunningBackupDatabase = (stats != null) && (stats.runningBackupDatabaseJobId != null);
// === Rule 1: Concurrency limit ===
if (runningBackups + runningRestores >= Config.max_backup_restore_concurrent_num_per_db) {
return false;
}
// === Rule 2-A: Running backup_database -> backup cannot activate ===
if (hasRunningBackupDatabase && isBackup) {
return false;
}
// === Rule 2-B: Job is backup_database -> wait for all backups to complete ===
if (isJobBackupDatabase && runningBackups > 0) {
return false;
}
// === Rule 3: Backup/Restore mutual exclusion ===
if (isBackup && runningRestores > 0) {
return false;
}
if (isJobRestore && runningBackups > 0) {
return false;
}
// === Rule 4: For restore jobs, check table conflicts ===
if (isJobRestore && jobToCheck instanceof RestoreJob) {
RestoreJob restoreJob = (RestoreJob) jobToCheck;
// Check full database restore conflict
if (stats != null && stats.restoreDatabaseJobId != null) {
return false; // Full database restore is running
}
// Check if this is a full database restore
if (restoreJob.getTableRefs().isEmpty()) {
Set<String> currentRestoring = restoringTables.get(dbId);
if (currentRestoring != null && !currentRestoring.isEmpty()) {
return false; // Other tables are being restored
}
} else {
// Check table conflicts (O(k) where k = number of tables in the job)
Set<String> currentRestoring = restoringTables.get(dbId);
if (currentRestoring != null && !currentRestoring.isEmpty()) {
for (TableRefInfo tblRef : restoreJob.getTableRefs()) {
String tableName = tblRef.getTableNameInfo().getTbl();
if (currentRestoring.contains(tableName)) {
return false; // Table conflict
}
}
}
}
}
return true;
}
/**
* Called when a job completes (finished or cancelled) to update statistics
* and try to activate pending jobs.
*
* @param jobId the completed job ID
* @param dbId database ID
* @param job the completed job
*/
public void onJobCompleted(long jobId, long dbId, AbstractJob job) {
if (!Config.enable_table_level_backup_concurrency) {
return;
}
boolean isBackup = (job instanceof BackupJob);
// === 0. Move from running queue to history queue (dual queue architecture) ===
moveToHistory(dbId, job);
// === 1. Update statistics (O(1)) ===
DatabaseJobStats stats = dbJobStats.get(dbId);
if (stats != null) {
if (isBackup) {
stats.activeBackups--;
if (Long.valueOf(jobId).equals(stats.backupDatabaseJobId)) {
stats.backupDatabaseJobId = null;
stats.backupDatabaseLabel = null;
}
} else {
stats.activeRestores--;
if (Long.valueOf(jobId).equals(stats.restoreDatabaseJobId)) {
stats.restoreDatabaseJobId = null;
stats.restoreDatabaseLabel = null;
}
}
stats.activeLabels.remove(job.getLabel());
// Clean up empty stats
if (stats.activeBackups == 0 && stats.activeRestores == 0) {
dbJobStats.remove(dbId);
}
}
// === 2. Update label index ===
Map<String, Long> labels = labelIndex.get(dbId);
if (labels != null) {
labels.remove(job.getLabel());
if (labels.isEmpty()) {
labelIndex.remove(dbId);
}
}
// === 3. Remove execution permission and update running counters ===
boolean wasRunning = allowedJobIds.remove(jobId);
if (wasRunning) {
onJobDeactivated(dbId, job, isBackup);
}
// === 4. Clean up restoringTables for restore jobs ===
if (job instanceof RestoreJob) {
removeRestoringTables((RestoreJob) job);
}
if (Config.enable_backup_concurrency_logging) {
LOG.info("Job completed: dbId={}, label={}, activeBackups={}, activeRestores={}, "
+ "runningBackups={}, runningRestores={}",
dbId, job.getLabel(),
stats != null ? stats.activeBackups : 0,
stats != null ? stats.activeRestores : 0,
stats != null ? stats.runningBackups : 0,
stats != null ? stats.runningRestores : 0);
}
// === 5. Try to activate PENDING jobs ===
tryActivatePendingJobs(dbId);
}
/**
* Try to activate pending jobs after a job completes.
* This method scans the job queue and activates jobs that can now run.
*
* @param dbId database ID
*/
private void tryActivatePendingJobs(long dbId) {
// Get jobs to scan based on concurrency mode
Deque<AbstractJob> jobs;
if (Config.enable_table_level_backup_concurrency) {
// In dual queue mode, scan running queue
runningJobLock.lock();
try {
jobs = dbIdToRunningJobs.get(dbId);
if (jobs == null || jobs.isEmpty()) {
return;
}
// Make a copy to avoid concurrent modification
jobs = new LinkedList<>(jobs);
} finally {
runningJobLock.unlock();
}
} else {
// Legacy mode: use old single queue
jobs = dbIdToBackupOrRestoreJobs.get(dbId);
if (jobs == null || jobs.isEmpty()) {
return;
}
}
for (AbstractJob job : jobs) {
// Only process PENDING jobs
if (job.isDone()) {
continue;
}
// Already has execution permission, skip
if (allowedJobIds.contains(job.getJobId())) {
continue;
}
// Check if can activate
boolean isBackup = job instanceof BackupJob;
boolean isFullDatabase = false;
if (isBackup && job instanceof BackupJob) {
isFullDatabase = ((BackupJob) job).getTableRefs().isEmpty();
} else if (!isBackup && job instanceof RestoreJob) {
isFullDatabase = ((RestoreJob) job).getTableRefs().isEmpty();
}
if (canActivateJob(dbId, job, isBackup)) {
allowedJobIds.add(job.getJobId());
onJobActivated(dbId, job, isBackup, isFullDatabase);
// CRITICAL: Add restoring tables atomically with activation
// to prevent TOCTOU race condition
if (job instanceof RestoreJob) {
addRestoringTables((RestoreJob) job);
}
LOG.info("Activated pending job: {} (jobId={})",
job.getLabel(), job.getJobId());
}
}
}
/**
* Add restore job's tables to the restoring set for conflict detection.
*
* @param job the restore job
*/
public void addRestoringTables(RestoreJob job) {
long dbId = job.getDbId();
// Check if this is a full database restore
if (job.getTableRefs().isEmpty()) {
// Full database restore: mark as exclusive
DatabaseJobStats stats = dbJobStats.computeIfAbsent(dbId,
k -> new DatabaseJobStats());
stats.restoreDatabaseJobId = job.getJobId();
stats.restoreDatabaseLabel = job.getLabel();
LOG.info("Full database restore [{}] started, blocking other restores on db [{}]",
job.getLabel(), dbId);
return;
}
// Table-level restore: add table names to set
Set<String> tables = restoringTables.computeIfAbsent(dbId, k -> ConcurrentHashMap.newKeySet());
for (TableRefInfo tblRef : job.getTableRefs()) {
String tableName = tblRef.getTableNameInfo().getTbl();
tables.add(tableName);
if (Config.enable_backup_concurrency_logging) {
LOG.info("Table [{}] added to restoring set, job: [{}]", tableName, job.getLabel());
}
}
}
/**
* Remove restore job's tables from the restoring set when job completes.
*
* @param job the restore job
*/
private void removeRestoringTables(RestoreJob job) {
long dbId = job.getDbId();
// Check if this is a full database restore
if (job.getTableRefs().isEmpty()) {
// Full database restore: clear marker
DatabaseJobStats stats = dbJobStats.get(dbId);
if (stats != null && Long.valueOf(job.getJobId()).equals(stats.restoreDatabaseJobId)) {
stats.restoreDatabaseJobId = null;
stats.restoreDatabaseLabel = null;
}
LOG.info("Full database restore [{}] completed, unblocking other restores",
job.getLabel());
return;
}
// Table-level restore: remove table names
Set<String> tables = restoringTables.get(dbId);
if (tables != null) {
for (TableRefInfo tblRef : job.getTableRefs()) {
String tableName = tblRef.getTableNameInfo().getTbl();
tables.remove(tableName);
if (Config.enable_backup_concurrency_logging) {
LOG.info("Table [{}] removed from restoring set, job: [{}]",
tableName, job.getLabel());
}
}
// Remove key if set is empty
if (tables.isEmpty()) {
restoringTables.remove(dbId);
}
}
}
/**
* Get the block reason for a job (for SHOW command).
*
* @param job the job to check
* @return block reason string, or null if not blocked
*/
public String getJobBlockReason(AbstractJob job) {
if (!Config.enable_table_level_backup_concurrency) {
return null;
}
// Finished/cancelled jobs are not blocked
if (job.isDone()) {
return null;
}
// If job has execution permission, it's not blocked
if (allowedJobIds.contains(job.getJobId())) {
return null;
}
long dbId = job.getDbId();
boolean isBackup = job instanceof BackupJob;
// Check backup/restore mutual exclusion
DatabaseJobStats stats = dbJobStats.get(dbId);
if (stats != null) {
if (isBackup && stats.activeRestores > 0) {
return "Waiting for " + stats.activeRestores + " restore(s) to complete";
}
if (!isBackup && stats.activeBackups > 0) {
return "Waiting for " + stats.activeBackups + " backup(s) to complete";
}
// Check full database backup blocking
if (isBackup && stats.backupDatabaseJobId != null) {
return "Full database backup is running: " + stats.backupDatabaseLabel;
}
}
// Check restore table conflicts
if (job instanceof RestoreJob) {
RestoreJob restoreJob = (RestoreJob) job;
// Check full database restore
if (stats != null && stats.restoreDatabaseJobId != null) {
return "Full database restore is running: " + stats.restoreDatabaseLabel;
}
// Check table conflicts
Set<String> restoring = restoringTables.get(dbId);
if (restoring != null && !restoring.isEmpty()) {
Set<String> conflicts = new HashSet<>();
for (TableRefInfo tblRef : restoreJob.getTableRefs()) {
String tableName = tblRef.getTableNameInfo().getTbl();
if (restoring.contains(tableName)) {
conflicts.add(tableName);
}
}
if (!conflicts.isEmpty()) {
return "Table conflict: " + String.join(", ", conflicts);
}
}
}
return "Waiting for execution slot";
}
/**
* Get the queue position for a job (for SHOW command).
* Returns 0 if job is running, or the 1-based position in the queue.
*
* @param job the job to check
* @return queue position (0 = running, 1+ = position in queue)
*/
public int getJobQueuePosition(AbstractJob job) {
if (!Config.enable_table_level_backup_concurrency) {
return 0;
}
// Finished/cancelled jobs have no queue position
if (job.isDone()) {
return 0;
}
// If job has execution permission, it's running (position 0)
if (allowedJobIds.contains(job.getJobId())) {
return 0;
}
// Count position in pending queue (use dbIdToRunningJobs in concurrency mode)
Deque<AbstractJob> jobs = dbIdToRunningJobs.get(job.getDbId());
if (jobs == null) {
return 0;
}
int position = 0;
for (AbstractJob j : jobs) {
if (j.isDone()) {
continue;
}
if (!allowedJobIds.contains(j.getJobId())) {
position++;
if (j.getJobId() == job.getJobId()) {
return position;
}
}
}
return position;
}
/**
* Database job statistics for O(1) concurrency checking.
* This class maintains counters and markers for each database to avoid
* traversing the job queue on every submission.
*/
static class DatabaseJobStats {
// Active job counts (includes PENDING jobs, not just running)
int activeBackups = 0;
int activeRestores = 0;
// Running job counts (only jobs in allowedJobIds, for O(1) canActivate check)
int runningBackups = 0;
int runningRestores = 0;
// Full database backup marker (in queue, pending or running)
Long backupDatabaseJobId = null; // jobId of the full database backup
String backupDatabaseLabel = null; // label of the full database backup
// Running full database backup marker (for canActivate check)
Long runningBackupDatabaseJobId = null;
// Full database restore marker (integrated here for unified management)
Long restoreDatabaseJobId = null; // jobId of the full database restore
String restoreDatabaseLabel = null; // label of the full database restore
// All active job labels (for error messages)
Set<String> activeLabels = new HashSet<>();
/**
* Check if there are any active jobs.
*/
boolean hasActiveJobs() {
return activeBackups > 0 || activeRestores > 0;
}
/**
* Get total active job count.
*/
int getTotalActiveJobs() {
return activeBackups + activeRestores;
}
/**
* Get total running job count.
*/
int getTotalRunningJobs() {
return runningBackups + runningRestores;
}
@Override
public String toString() {
return "DatabaseJobStats{"
+ "activeBackups=" + activeBackups
+ ", activeRestores=" + activeRestores
+ ", runningBackups=" + runningBackups
+ ", runningRestores=" + runningRestores
+ ", backupDatabaseLabel=" + backupDatabaseLabel
+ ", restoreDatabaseLabel=" + restoreDatabaseLabel
+ '}';
}
}
}