MediumDecisionMaker.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.plans.commands.RestoreCommand;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Unified medium decision maker for restore operations.
*
* This class centralizes all storage medium selection logic, replacing scattered
* decision-making across multiple classes (OlapTable, RestoreJob, etc.).
*
* Key responsibilities:
* 1. Determine target storage medium based on restore configuration
* 2. Handle adaptive mode downgrade when preferred medium is unavailable
* 3. Implement "avoid migration" strategy for atomic restore
* 4. Provide complete decision traceability through structured results
*/
public class MediumDecisionMaker {
private static final Logger LOG = LogManager.getLogger(MediumDecisionMaker.class);
private final String storageMedium; // hdd/ssd/same_with_upstream
private final String mediumAllocationMode; // strict/adaptive
/**
* Decision result containing the final medium and decision metadata.
*/
public static class MediumDecision {
private TStorageMedium finalMedium; // Final medium to use
private boolean wasDowngraded; // Whether adaptive downgrade occurred
private TStorageMedium originalMedium; // Original preferred medium
private String reason; // Decision reason for logging
public MediumDecision(TStorageMedium finalMedium, TStorageMedium originalMedium,
boolean wasDowngraded, String reason) {
this.finalMedium = finalMedium;
this.originalMedium = originalMedium;
this.wasDowngraded = wasDowngraded;
this.reason = reason;
}
public TStorageMedium getFinalMedium() {
return finalMedium;
}
public boolean wasDowngraded() {
return wasDowngraded;
}
public TStorageMedium getOriginalMedium() {
return originalMedium;
}
public String getReason() {
return reason;
}
@Override
public String toString() {
return String.format("MediumDecision[final=%s, original=%s, downgraded=%s, reason='%s']",
finalMedium, originalMedium, wasDowngraded, reason);
}
}
public MediumDecisionMaker(String storageMedium, String mediumAllocationMode) {
this.storageMedium = storageMedium;
this.mediumAllocationMode = mediumAllocationMode;
}
/**
* Scenario 1: Decide medium for new partition (non-atomic restore or new table).
*
* Logic:
* 1. Determine original preferred medium from config
* 2. Try to allocate backends with preferred medium
* 3. If adaptive mode and preferred unavailable, downgrade to available medium
*
* @param partitionName Partition name (for logging)
* @param upstreamDataProperty DataProperty from upstream partition
* @param replicaAlloc Replica allocation requirement
* @return Decision result with final medium and metadata
* @throws DdlException If no medium is available (strict mode)
*/
public MediumDecision decideForNewPartition(
String partitionName,
DataProperty upstreamDataProperty,
ReplicaAllocation replicaAlloc) throws DdlException {
// Step 1: Determine original preferred medium
TStorageMedium preferredMedium;
String source;
if (isSameWithUpstream()) {
preferredMedium = upstreamDataProperty.getStorageMedium();
source = "inherited from upstream";
} else {
preferredMedium = getTargetStorageMedium();
source = "user specified: " + storageMedium;
}
// Step 2: Try to allocate with preferred medium
DataProperty.MediumAllocationMode mode = getTargetAllocationMode();
Map<Tag, Integer> nextIndexes = new HashMap<>();
Pair<Map<Tag, List<Long>>, TStorageMedium> result =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexes, preferredMedium, mode, false);
TStorageMedium actualMedium = result.second;
boolean downgraded = (actualMedium != preferredMedium);
// Step 3: Build decision result
String reason = source;
if (downgraded) {
reason += String.format(", downgraded from %s to %s (preferred medium unavailable)",
preferredMedium, actualMedium);
}
MediumDecision decision = new MediumDecision(actualMedium, preferredMedium, downgraded, reason);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided medium for new partition {}: {} (mode: {})",
partitionName, decision, mode);
}
return decision;
}
/**
* Scenario 2: Decide medium for atomic restore.
*
* Core principle: Prefer local medium to avoid data migration, unless:
* 1. Local medium is truly unavailable (adaptive mode)
* 2. User explicitly specified different medium (non same_with_upstream)
*
* Logic:
* 1. If same_with_upstream + adaptive:
* - Check if local medium is available
* - Available ��� use local (avoid migration)
* - Unavailable ��� use remote/configured (allow migration for availability)
* 2. If same_with_upstream + strict:
* - Force use local medium (avoid migration)
* 3. If hdd/ssd (explicit):
* - Use configured medium (allow migration, respect user config)
*
* @param partitionName Partition name (for logging)
* @param upstreamDataProperty DataProperty from upstream partition
* @param localDataProperty DataProperty from local partition
* @param replicaAlloc Replica allocation requirement
* @return Decision result with final medium and metadata
* @throws DdlException If no medium is available (strict mode)
*/
public MediumDecision decideForAtomicRestore(
String partitionName,
DataProperty upstreamDataProperty,
DataProperty localDataProperty,
ReplicaAllocation replicaAlloc) throws DdlException {
TStorageMedium localMedium = localDataProperty.getStorageMedium();
TStorageMedium upstreamMedium = upstreamDataProperty.getStorageMedium();
// Determine original preferred medium from config
TStorageMedium configuredMedium;
if (isSameWithUpstream()) {
configuredMedium = upstreamMedium;
} else {
configuredMedium = getTargetStorageMedium();
}
DataProperty.MediumAllocationMode mode = getTargetAllocationMode();
// Strategy decision based on config
if (isSameWithUpstream()) {
// same_with_upstream: prefer local medium to avoid migration
if (mode.isAdaptive()) {
// Adaptive: prefer local, allow downgrade if unavailable
return decidePreferLocalMedium(partitionName, localMedium,
configuredMedium, replicaAlloc, mode);
} else {
// Strict: must use local medium (to avoid migration), check availability
return decideWithLocalMediumStrict(partitionName, localMedium,
configuredMedium, replicaAlloc, mode);
}
} else {
// Explicit hdd/ssd: respect user config (allow migration)
return decideWithConfiguredMedium(partitionName, configuredMedium,
localMedium, replicaAlloc, mode);
}
}
/**
* Scenario 3: Decide table-level medium.
*
* Table-level medium is used as the default for future partitions.
*
* @param upstreamTable Table from backup meta
* @return Final table-level medium
*/
public TStorageMedium decideForTableLevel(OlapTable upstreamTable) {
if (isSameWithUpstream()) {
// Inherit from upstream
TStorageMedium medium = upstreamTable.getStorageMedium();
if (LOG.isDebugEnabled()) {
LOG.debug("Table {} preserving upstream table-level medium {} (same_with_upstream)",
upstreamTable.getName(), medium);
}
return medium;
} else {
// Use configured medium
TStorageMedium medium = getTargetStorageMedium();
if (LOG.isDebugEnabled()) {
LOG.debug("Table {} using configured table-level medium {} (storage_medium={})",
upstreamTable.getName(), medium, storageMedium);
}
return medium;
}
}
/**
* Strategy: Use local medium in strict mode (for atomic restore with same_with_upstream).
* Must check availability and throw exception if local medium is unavailable.
*/
private MediumDecision decideWithLocalMediumStrict(
String partitionName,
TStorageMedium localMedium,
TStorageMedium configuredMedium,
ReplicaAllocation replicaAlloc,
DataProperty.MediumAllocationMode mode) throws DdlException {
// In strict mode, we must verify that local medium is available
// If unavailable, throw exception (no fallback allowed in strict mode)
Map<Tag, Integer> nextIndexes = new HashMap<>();
Pair<Map<Tag, List<Long>>, TStorageMedium> result =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexes, localMedium, mode, false);
TStorageMedium actualMedium = result.second;
if (actualMedium != localMedium) {
// This should not happen in strict mode, but if it does, it's an error
throw new DdlException(String.format(
"Failed to allocate local medium %s for partition %s in strict mode. "
+ "System attempted to use %s instead, but strict mode does not allow fallback.",
localMedium, partitionName, actualMedium));
}
String reason = String.format("atomic restore with strict mode, using local medium "
+ "(avoiding migration, configured=%s, verified available)", configuredMedium);
MediumDecision decision = new MediumDecision(localMedium, configuredMedium,
localMedium != configuredMedium, reason);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided medium for atomic restore partition {}: {} (strategy: local strict, verified)",
partitionName, decision);
}
return decision;
}
/**
* Strategy: Prefer local medium to avoid migration (adaptive mode).
*/
private MediumDecision decidePreferLocalMedium(
String partitionName,
TStorageMedium localMedium,
TStorageMedium configuredMedium,
ReplicaAllocation replicaAlloc,
DataProperty.MediumAllocationMode mode) throws DdlException {
try {
// Check if local medium is available
Map<Tag, Integer> nextIndexes = new HashMap<>();
Pair<Map<Tag, List<Long>>, TStorageMedium> testResult =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexes, localMedium, mode, true /* check only */);
if (testResult.second == localMedium) {
// Local medium available ��� use it (avoid migration)
String reason = String.format("atomic restore with adaptive mode, prefer local medium "
+ "(avoiding migration, configured=%s)", configuredMedium);
MediumDecision decision = new MediumDecision(localMedium, configuredMedium,
localMedium != configuredMedium, reason);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided medium for atomic restore partition {}: {} "
+ "(strategy: prefer local, available)",
partitionName, decision);
}
return decision;
} else {
// Local medium unavailable ��� use downgraded medium (allow migration)
TStorageMedium downgradedMedium = testResult.second;
String reason = String.format("atomic restore with adaptive mode, local medium %s unavailable, "
+ "using %s (migration needed, configured=%s)",
localMedium, downgradedMedium, configuredMedium);
MediumDecision decision = new MediumDecision(downgradedMedium, configuredMedium,
true, reason);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided medium for atomic restore partition {}: {} "
+ "(strategy: prefer local, but unavailable)",
partitionName, decision);
}
return decision;
}
} catch (DdlException e) {
// Check failed, use conservative strategy: local medium
String reason = String.format("atomic restore, check failed, using local medium %s "
+ "(conservative strategy, configured=%s): %s",
localMedium, configuredMedium, e.getMessage());
MediumDecision decision = new MediumDecision(localMedium, configuredMedium,
localMedium != configuredMedium, reason);
LOG.warn("Decided medium for atomic restore partition {}: {} (strategy: conservative due to error)",
partitionName, decision);
return decision;
}
}
/**
* Strategy: Use configured medium (explicit hdd/ssd, allow migration).
*/
private MediumDecision decideWithConfiguredMedium(
String partitionName,
TStorageMedium configuredMedium,
TStorageMedium localMedium,
ReplicaAllocation replicaAlloc,
DataProperty.MediumAllocationMode mode) throws DdlException {
Map<Tag, Integer> nextIndexes = new HashMap<>();
Pair<Map<Tag, List<Long>>, TStorageMedium> result =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexes, configuredMedium, mode, false);
TStorageMedium actualMedium = result.second;
boolean downgraded = (actualMedium != configuredMedium);
String reason = String.format("atomic restore with explicit medium, using configured medium "
+ "(migration allowed if needed, local=%s, configured=%s)",
localMedium, configuredMedium);
if (downgraded) {
reason += String.format(", downgraded to %s (configured medium unavailable)", actualMedium);
}
MediumDecision decision = new MediumDecision(actualMedium, configuredMedium, downgraded, reason);
if (LOG.isDebugEnabled()) {
LOG.debug("Decided medium for atomic restore partition {}: {} (strategy: use configured, mode={})",
partitionName, decision, mode);
}
return decision;
}
// Helper methods
private boolean isSameWithUpstream() {
return RestoreCommand.STORAGE_MEDIUM_SAME_WITH_UPSTREAM.equals(storageMedium);
}
private TStorageMedium getTargetStorageMedium() {
if (RestoreCommand.STORAGE_MEDIUM_HDD.equals(storageMedium)) {
return TStorageMedium.HDD;
} else if (RestoreCommand.STORAGE_MEDIUM_SSD.equals(storageMedium)) {
return TStorageMedium.SSD;
}
throw new IllegalStateException("getTargetStorageMedium() should not be called "
+ "when storage_medium is 'same_with_upstream'");
}
private DataProperty.MediumAllocationMode getTargetAllocationMode() {
if (RestoreCommand.MEDIUM_ALLOCATION_MODE_STRICT.equals(mediumAllocationMode)) {
return DataProperty.MediumAllocationMode.STRICT;
} else if (RestoreCommand.MEDIUM_ALLOCATION_MODE_ADAPTIVE.equals(mediumAllocationMode)) {
return DataProperty.MediumAllocationMode.ADAPTIVE;
}
// Default to strict
return DataProperty.MediumAllocationMode.STRICT;
}
}