CacheHotspotManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.CloudWarmUpJob.JobType;
import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Triple;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.trees.plans.commands.CancelWarmUpJobCommand;
import org.apache.doris.nereids.trees.plans.commands.WarmUpClusterCommand;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
import org.apache.doris.thrift.THotPartition;
import org.apache.doris.thrift.THotTableMessage;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class CacheHotspotManager extends MasterDaemon {
public static final int MAX_SHOW_ENTRIES = 2000;
private static final Logger LOG = LogManager.getLogger(CacheHotspotManager.class);
private static final int CYCLE_COUNT_TO_CHECK_EXPIRE_CLOUD_WARM_UP_JOB = 20;
private final CloudSystemInfoService nodeMgr;
// periodically clear and re-build <id, table> message for
// efficiency and memory consumption issue
private Map<Long, Table> idToTable = new HashMap<>();
private boolean tableCreated = false;
private List<String> insertValueBatches = new ArrayList<String>();
private int cycleCount = 0;
private MasterDaemon jobDaemon;
private boolean startJobDaemon = false;
private MasterDaemon tableFilterRefreshDaemon;
private boolean startTableFilterRefreshDaemon = false;
private MasterDaemon warmUpSyncStatsRefreshDaemon;
private boolean startWarmUpSyncStatsRefreshDaemon = false;
// Thread pool for concurrent BE HTTP requests during on-demand stats collection
private final ExecutorService warmupStatsExecutor = Executors.newFixedThreadPool(16,
new ThreadFactoryBuilder().setNameFormat("warmup-stats-collector-%d").setDaemon(true).build());
private ConcurrentMap<Long, CloudWarmUpJob> cloudWarmUpJobs = Maps.newConcurrentMap();
private ConcurrentMap<Long, CloudWarmUpJob> activeCloudWarmUpJobs = Maps.newConcurrentMap();
private ConcurrentMap<Long, CloudWarmUpJob> runnableCloudWarmUpJobs = Maps.newConcurrentMap();
private final ThreadPoolExecutor cloudWarmUpThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(
Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true);
private static class JobKey {
private final String srcName;
private final String dstName;
private final CloudWarmUpJob.SyncMode syncMode;
private final String tableFilterExpr;
public JobKey(String srcName, String dstName, CloudWarmUpJob.SyncMode syncMode) {
this(srcName, dstName, syncMode, "");
}
public JobKey(String srcName, String dstName, CloudWarmUpJob.SyncMode syncMode, String tableFilterExpr) {
this.srcName = srcName;
this.dstName = dstName;
this.syncMode = syncMode;
this.tableFilterExpr = tableFilterExpr == null ? "" : tableFilterExpr;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof JobKey)) {
return false;
}
JobKey jobKey = (JobKey) o;
return Objects.equals(srcName, jobKey.srcName)
&& Objects.equals(dstName, jobKey.dstName)
&& syncMode == jobKey.syncMode
&& Objects.equals(tableFilterExpr, jobKey.tableFilterExpr);
}
@Override
public int hashCode() {
return Objects.hash(srcName, dstName, syncMode, tableFilterExpr);
}
@Override
public String toString() {
String s = "WarmUpJob src='" + srcName + "', dst='" + dstName + "', syncMode=" + String.valueOf(syncMode);
if (!tableFilterExpr.isEmpty()) {
s += ", tableFilter=" + tableFilterExpr;
}
return s;
}
}
// Tracks long-running jobs (event-driven and periodic).
// Ensures only one active job exists per <source, destination, sync_mode> tuple.
private Set<JobKey> repeatJobDetectionSet = ConcurrentHashMap.newKeySet();
private void registerJobForRepeatDetection(CloudWarmUpJob job, boolean replay) throws AnalysisException {
if (job.isDone()) {
return;
}
if (!replay) {
checkLoadEventWarmUpConflict(job);
}
if (job.isEventDriven() || job.isPeriodic()) {
JobKey key = new JobKey(job.getSrcClusterName(), job.getDstClusterName(),
job.getSyncMode(), job.getTableFilterExpr());
boolean added = this.repeatJobDetectionSet.add(key);
if (!added && !replay) {
throw new AnalysisException(key + " already has a runnable job");
}
}
}
// Only checks cross-type conflicts between table-level and cluster-level load-event warm-up jobs.
// Same-type duplicate jobs are still rejected later by repeatJobDetectionSet.
private void checkLoadEventWarmUpConflict(CloudWarmUpJob newJob) throws AnalysisException {
if (!isLoadEventWarmUpJob(newJob)) {
return;
}
for (CloudWarmUpJob existingJob : runnableCloudWarmUpJobs.values()) {
if (existingJob.getJobId() == newJob.getJobId() || existingJob.isDone()
|| !isLoadEventWarmUpJob(existingJob)) {
continue;
}
if (!isSameWarmUpPair(newJob, existingJob)) {
continue;
}
if (isTableLevelLoadEventWarmUpJob(newJob) != isTableLevelLoadEventWarmUpJob(existingJob)) {
throw buildLoadEventWarmUpConflictException(newJob, existingJob);
}
}
}
public void cancelTableLevelLoadEventWarmUpJobsForVirtualComputeGroup(
String virtualComputeGroupName, String activeComputeGroup, String standbyComputeGroup,
List<String> subComputeGroups, String reason) throws AnalysisException {
String cancelReason = reason + " for virtual compute group '" + virtualComputeGroupName + "'";
Set<String> computeGroupsInVcg = new HashSet<>();
if (subComputeGroups != null) {
computeGroupsInVcg.addAll(subComputeGroups);
}
computeGroupsInVcg.add(activeComputeGroup);
computeGroupsInVcg.add(standbyComputeGroup);
for (CloudWarmUpJob existingJob : runnableCloudWarmUpJobs.values()) {
if (existingJob.isDone() || !isTableLevelLoadEventWarmUpJob(existingJob)) {
continue;
}
if (!computeGroupsInVcg.contains(existingJob.getSrcClusterName())
|| !computeGroupsInVcg.contains(existingJob.getDstClusterName())) {
continue;
}
try {
cancel(existingJob.getJobId(), cancelReason);
LOG.info("cancel table-level load-event warm up job {} before virtual compute group '{}' creates "
+ "cluster-level load-event warm up job. active compute group {}, "
+ "standby compute group {}, source compute group {}, destination compute group {}{}, "
+ "reason: {}",
existingJob.getJobId(), virtualComputeGroupName, activeComputeGroup, standbyComputeGroup,
existingJob.getSrcClusterName(), existingJob.getDstClusterName(),
formatExistingTableFilter(existingJob), cancelReason);
} catch (DdlException e) {
throw new AnalysisException("Failed to cancel table-level load-event warm up job "
+ existingJob.getJobId() + " before virtual compute group '" + virtualComputeGroupName
+ "' creates cluster-level load-event warm up job from active compute group '"
+ activeComputeGroup + "' to standby compute group '" + standbyComputeGroup
+ "'. Source compute group '" + existingJob.getSrcClusterName()
+ "', destination compute group '" + existingJob.getDstClusterName() + "'"
+ formatExistingTableFilter(existingJob) + ". Cancel table-level load-event warm up job "
+ existingJob.getJobId() + " before retrying.", e);
}
}
}
private static boolean isLoadEventWarmUpJob(CloudWarmUpJob job) {
return job != null && job.isEventDriven() && job.getSyncEvent() == CloudWarmUpJob.SyncEvent.LOAD;
}
private static boolean isClusterLevelLoadEventWarmUpJob(CloudWarmUpJob job) {
return isLoadEventWarmUpJob(job) && job.getJobType() == JobType.CLUSTER;
}
private static boolean isTableLevelLoadEventWarmUpJob(CloudWarmUpJob job) {
return isLoadEventWarmUpJob(job) && job.getJobType() == JobType.TABLES;
}
private static boolean isSameWarmUpPair(CloudWarmUpJob left, CloudWarmUpJob right) {
return Objects.equals(left.getSrcClusterName(), right.getSrcClusterName())
&& Objects.equals(left.getDstClusterName(), right.getDstClusterName());
}
private static AnalysisException buildLoadEventWarmUpConflictException(
CloudWarmUpJob newJob, CloudWarmUpJob existingJob) {
String newJobLevel = isTableLevelLoadEventWarmUpJob(newJob) ? "table-level" : "cluster-level";
String existingJobLevel = isClusterLevelLoadEventWarmUpJob(existingJob) ? "cluster-level" : "table-level";
return new AnalysisException("Cannot create " + newJobLevel + " load-event warm up job from source "
+ "compute group '" + newJob.getSrcClusterName() + "' to destination compute group '"
+ newJob.getDstClusterName() + "': conflicting " + existingJobLevel
+ " load-event warm up job " + existingJob.getJobId()
+ " already exists for the same source and destination"
+ formatExistingTableFilter(existingJob)
+ ". Cancel existing load-event warm up job " + existingJob.getJobId()
+ " before creating this job.");
}
private static String formatExistingTableFilter(CloudWarmUpJob job) {
if (!job.hasTableFilter()) {
return "";
}
return " with table filter [" + job.getTableFilterExpr() + "]";
}
// Tracks warm-up jobs scheduled by CacheHotSpotManager.
// Ensures that at most one job runs concurrently per destination cluster.
private Map<String, Long> clusterToRunningJobId = new ConcurrentHashMap<>();
/**
* Attempts to register a job as running for the given destination cluster.
* <p>
* For one-time or periodic jobs, returns {@code false} if there is already a running job
* for the specified destination cluster. Returns {@code true} if this job is successfully
* registered as the only running job for that cluster.
* <p>
* For event-driven jobs, this method does not perform any registration and always returns {@code true}.
*
* @param job the CloudWarmUpJob to register
* @return {@code true} if the job was registered successfully or is event-driven; {@code false} otherwise
*/
public boolean tryRegisterRunningJob(CloudWarmUpJob job) {
if (job.isEventDriven()) {
// Event-driven jobs do not require registration, always allow
return true;
}
String clusterName = job.getDstClusterName();
long jobId = job.getJobId();
// Try to register the job atomically if absent
Long existingJobId = clusterToRunningJobId.putIfAbsent(clusterName, jobId);
boolean success = (existingJobId == null) || (existingJobId == jobId);
if (!success) {
LOG.info("Job {} skipped: waiting for job {} to finish on destination cluster {}",
jobId, existingJobId, clusterName);
}
return success;
}
/**
* Deregisters the given job from the running jobs map, allowing another job
* to run on the same destination cluster.
* <p>
* For event-driven jobs, this method does nothing and always returns {@code true}
* since they are not registered.
* <p>
* This method only removes the job if the currently registered job ID matches
* the job's ID, ensuring no accidental deregistration of other jobs.
*
* @param job the CloudWarmUpJob to deregister
* @return {@code true} if the job was successfully deregistered or is event-driven; {@code false} otherwise
*/
private boolean deregisterRunningJob(CloudWarmUpJob job) {
if (job.isEventDriven()) {
// Event-driven jobs are not registered, so nothing to deregister
return true;
}
String clusterName = job.getDstClusterName();
long jobId = job.getJobId();
return clusterToRunningJobId.remove(clusterName, jobId);
}
public void notifyJobStop(CloudWarmUpJob job) {
if (job.isOnce() || job.isPeriodic()) {
this.deregisterRunningJob(job);
}
if (!job.isDone()) {
return;
}
if (job.isEventDriven() || job.isPeriodic()) {
this.repeatJobDetectionSet.remove(new JobKey(
job.getSrcClusterName(), job.getDstClusterName(),
job.getSyncMode(), job.getTableFilterExpr()));
}
}
public CacheHotspotManager(CloudSystemInfoService nodeMgr) {
super("CacheHotspotManager", Config.fetch_cluster_cache_hotspot_interval_ms);
this.nodeMgr = nodeMgr;
}
@Override
public void runAfterCatalogReady() {
if (!startJobDaemon) {
jobDaemon = new JobDaemon();
jobDaemon.start();
startJobDaemon = true;
}
if (!startTableFilterRefreshDaemon) {
tableFilterRefreshDaemon = new TableFilterRefreshDaemon();
tableFilterRefreshDaemon.start();
startTableFilterRefreshDaemon = true;
}
if (Config.isCloudMode() && !startWarmUpSyncStatsRefreshDaemon) {
warmUpSyncStatsRefreshDaemon = new WarmUpSyncStatsRefreshDaemon();
warmUpSyncStatsRefreshDaemon.start();
startWarmUpSyncStatsRefreshDaemon = true;
}
if (!tableCreated) {
try {
CacheHotspotManagerUtils.execCreateCacheTable();
tableCreated = true;
this.intervalMs = Config.fetch_cluster_cache_hotspot_interval_ms;
} catch (Exception e) {
// sleep 60s wait for syncing storage vault info from ms and retry
this.intervalMs = 60000;
LOG.warn("Create cache hot spot table failed, sleep 60s and retry", e);
return;
}
}
traverseAllDatabaseForTable();
// it's thread safe to iterate through this concurrent map's ref
nodeMgr.getCloudClusterIdToBackend(false).entrySet().forEach(clusterToBeList -> {
List<Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>> futureList
= new ArrayList<>();
clusterToBeList.getValue().forEach(backend -> {
try {
futureList.add(getTopNHotPartitionsAsync(backend));
} catch (TException | RpcException e) {
LOG.warn("send getTopNHotPartitionsAsync to be {} failed due to {}", backend, e);
}
});
List<Pair<TGetTopNHotPartitionsResponse, Backend>> responseList = fetchOneClusterHotSpot(futureList);
responseList.forEach((Pair<TGetTopNHotPartitionsResponse, Backend> respPair) -> {
TGetTopNHotPartitionsResponse resp = respPair.first;
if (resp.isSetHotTables()) {
resp.getHotTables().forEach((THotTableMessage hotTable) -> {
if (hotTable.isSetHotPartitions()) {
hotTable.hot_partitions.forEach((THotPartition partition) -> {
insertIntoTable(clusterToBeList.getKey(), hotTable.table_id,
hotTable.index_id, resp.file_cache_size, partition, respPair.second);
});
}
});
}
});
});
triggerBatchInsert();
idToTable.clear();
}
public boolean containsCluster(String clusterName) {
return CacheHotspotManagerUtils.clusterContains(nodeMgr.getCloudClusterIdByName(clusterName));
}
// table_id table_name, index_id, partition_id
public List<List<String>> getClusterTopNHotPartitions(String clusterName) {
LOG.debug("getClusterTopNHotPartitions called with clusterName={}", clusterName);
return CacheHotspotManagerUtils.getClusterTopNPartitions(nodeMgr.getCloudClusterIdByName(clusterName));
}
/**
* traverse all database to cache all tableId -> table
*/
private void traverseAllDatabaseForTable() {
// dbs are stored in one concurrent hash map in catalog
// return one list of snapshot, the item might be deleted
// but java guarantee it will not be erased from memory
Env.getCurrentInternalCatalog().getDbs().forEach(database -> {
try {
database.readLock();
// database already dropped
if (database.getDbState() != Database.DbState.NORMAL) {
return;
}
// it's thread safe to merge one concurrent map
idToTable.putAll(database.getIdToTableRef());
} finally {
database.readUnlock();
}
});
}
private void triggerBatchInsert() {
try {
CacheHotspotManagerUtils.doBatchInsert(insertValueBatches);
} catch (Exception e) {
LOG.warn("Failed to insert into file cache hotspot table due to ", e);
} finally {
insertValueBatches.clear();
}
}
private void refreshWarmUpSyncStats() {
if (!Env.getCurrentEnv().isMaster()) {
MetricRepo.syncCloudWarmUpSyncJobMetricDefinitions(Collections.emptyList());
return;
}
Map<Long, JobWarmUpStats> statsMap = collectAndAggregate();
for (CloudWarmUpJob job : cloudWarmUpJobs.values()) {
JobWarmUpStats stats = job.isEventDriven() && !job.isDone() ? statsMap.get(job.getJobId()) : null;
job.setSyncStats(stats);
}
MetricRepo.syncCloudWarmUpSyncJobMetricDefinitions(cloudWarmUpJobs.values());
}
private void insertIntoTable(String clusterId, long tableId, long indexId, long fileCacheSize,
THotPartition partition, Backend backend) {
LOG.info("table id {}, index id {}, partition id {}", tableId, indexId, partition.partition_id);
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
DateTimeFormatter dateformatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String formattedDateTime = now.format(formatter);
String insertDay = now.format(dateformatter);
Map<String, String> params = new HashMap<>();
params.put("cluster_id", clusterId);
params.put("cluster_name", nodeMgr.getClusterNameByClusterId(clusterId));
params.put("backend_id", String.valueOf(backend.getId()));
params.put("creation_time", formattedDateTime);
params.put("file_cache_size", String.valueOf(fileCacheSize));
params.put("insert_day", insertDay);
Table t;
if (!idToTable.containsKey(tableId)) {
return;
}
// it might be null?
t = idToTable.get(tableId);
params.put("table_id", String.valueOf(tableId));
params.put("table_name", String.format("%s.%s", t.getDBName(), t.getName()));
OlapTable olapTable = (OlapTable) t;
params.put("index_name", String.valueOf(olapTable.getIndexNameById(indexId)));
params.put("index_id", String.valueOf(indexId));
Optional<Partition> op = t.getPartitionNames().stream().map(t::getPartition)
.filter(p -> p.getId() == partition.partition_id).findAny();
if (!op.isPresent()) {
LOG.warn("partition id {} is invalid", partition.partition_id);
return;
}
params.put("partition_name", op.get().getName());
params.put("partition_id", String.valueOf(partition.partition_id));
LOG.info("has qpd {}, has qpw {}", partition.isSetQueryPerDay(), partition.isSetQueryPerWeek());
if (partition.isSetQueryPerDay()) {
params.put("qpd", String.valueOf(partition.getQueryPerDay()));
} else {
params.put("qpd", "0");
}
if (partition.isSetQueryPerWeek()) {
params.put("qpw", String.valueOf(partition.getQueryPerWeek()));
} else {
params.put("qpw", "0");
}
// Doris's datetime v2 doesn't support time zone
LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(partition.last_access_time),
ZoneId.systemDefault());
params.put("last_access_time", localDateTime.format(formatter));
CacheHotspotManagerUtils.transformIntoCacheHotSpotTableValue(params, insertValueBatches);
if (insertValueBatches.size() == Config.batch_insert_cluster_cache_hotspot_num) {
triggerBatchInsert();
}
}
private Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>
getTopNHotPartitionsAsync(Backend be) throws TException, RpcException, RuntimeException {
CompletableFuture<TGetTopNHotPartitionsResponse> f = CompletableFuture.supplyAsync(() -> {
boolean ok = false;
BackendService.Client client = null;
TNetworkAddress address = null;
try {
address = new TNetworkAddress(be.getHost(), be.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TGetTopNHotPartitionsResponse resp = client.getTopNHotPartitions(
new TGetTopNHotPartitionsRequest());
ok = true;
return resp;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
});
return Pair.of(f, be);
}
// the message we need:
// cluster_id varchar,
// cluster_name varchar,
// backend_id bigint,
// creation_time DATETIMEV2,
// file_cache_size bigint,
// table_name varchar,
// partition_name varchar,
// last_access_time DATETIMEV2
private List<Pair<TGetTopNHotPartitionsResponse, Backend>> fetchOneClusterHotSpot(
List<Pair<CompletableFuture<TGetTopNHotPartitionsResponse>, Backend>> futureList) {
List<Pair<TGetTopNHotPartitionsResponse, Backend>> responseList = new ArrayList<>();
long timeoutMs = Math.min(5000, Config.remote_fragment_exec_timeout_ms);
futureList.forEach(futureBackendPair -> {
TStatusCode code = TStatusCode.OK;
String errMsg = null;
Exception exception = null;
Future<TGetTopNHotPartitionsResponse> f = futureBackendPair.key();
try {
// temporary value��� would change to config
TGetTopNHotPartitionsResponse result = f.get(timeoutMs, TimeUnit.MILLISECONDS);
responseList.add(Pair.of(result, futureBackendPair.second));
} catch (ExecutionException e) {
exception = e;
code = TStatusCode.THRIFT_RPC_ERROR;
} catch (InterruptedException e) {
exception = e;
code = TStatusCode.INTERNAL_ERROR;
} catch (TimeoutException e) {
exception = e;
errMsg = "timeout when waiting for fetch cache hotspot RPC. Wait(sec): " + timeoutMs / 1000;
code = TStatusCode.TIMEOUT;
}
if (code != TStatusCode.OK) {
LOG.warn("Fetch be {}'s cache hotspot information throw {}, errmsg {}",
futureBackendPair.second.getAddress(), exception, errMsg);
}
});
return responseList;
}
Long getFileCacheCapacity(String clusterName) throws RuntimeException {
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName);
Long totalFileCache = 0L;
for (Backend backend : backends) {
Long fileCacheSize = 0L;
boolean ok = false;
BackendService.Client client = null;
TNetworkAddress address = null;
try {
address = new TNetworkAddress(backend.getHost(), backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TGetTopNHotPartitionsResponse resp = client.getTopNHotPartitions(
new TGetTopNHotPartitionsRequest());
fileCacheSize = resp.file_cache_size;
ok = true;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
totalFileCache += fileCacheSize;
}
return totalFileCache;
}
public Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>> beToWarmUpTablets) {
final Long maxSizePerBatch = Config.cloud_warm_up_job_max_bytes_per_batch;
Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
for (Map.Entry<Long, List<Tablet>> entry : beToWarmUpTablets.entrySet()) {
List<List<Long>> batches = new ArrayList<>();
List<Long> batch = new ArrayList<>();
long curBatchSize = 0L;
for (Tablet tablet : entry.getValue()) {
if (curBatchSize + tablet.getDataSize(true, false) > maxSizePerBatch) {
batches.add(batch);
batch = new ArrayList<>();
curBatchSize = 0L;
}
batch.add(tablet.getId());
curBatchSize += tablet.getDataSize(true, false);
}
if (!batch.isEmpty()) {
batches.add(batch);
}
beToTabletIdBatches.put(entry.getKey(), batches);
}
return beToTabletIdBatches;
}
private List<Tablet> getHotTablets(String srcClusterName, String dstClusterName) {
Long dstTotalFileCache = getFileCacheCapacity(dstClusterName);
List<List<String>> result = getClusterTopNHotPartitions(srcClusterName);
Long warmUpTabletsSize = 0L;
List<Tablet> tablets = new ArrayList<>();
for (List<String> line : result) {
Long tableId = Long.parseLong(line.get(0));
String[] tmp = line.get(1).split("\\.");
String dbName = tmp[0];
Long partitionId = Long.parseLong(line.get(2));
Long indexId = Long.parseLong(line.get(3));
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
if (db == null) {
continue;
}
OlapTable table = (OlapTable) db.getTableNullable(tableId);
if (table == null) {
continue;
}
Partition partition = table.getPartition(partitionId);
if (partition == null) {
continue;
}
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
continue;
}
for (Tablet tablet : index.getTablets()) {
warmUpTabletsSize += tablet.getDataSize(true, false);
tablets.add(tablet);
if (warmUpTabletsSize >= dstTotalFileCache) {
break;
}
}
if (warmUpTabletsSize >= dstTotalFileCache) {
break;
}
}
Collections.reverse(tablets);
return tablets;
}
private List<Tablet> getAllTablets(String srcClusterName, String dstClusterName) {
List<Tablet> tablets = new ArrayList<>();
List<Database> dbs = Env.getCurrentInternalCatalog().getDbs();
for (Database db : dbs) {
List<Table> tables = db.getTables();
for (Table table : tables) {
if (!(table instanceof OlapTable)) {
continue;
}
OlapTable olapTable = (OlapTable) table;
for (Partition partition : olapTable.getPartitions()) {
// Maybe IndexExtState.ALL
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
tablets.add(tablet);
}
}
}
}
}
return tablets;
}
public Map<Long, List<Tablet>> warmUpNewClusterByCluster(String dstClusterName, String srcClusterName) {
List<Tablet> tablets;
if (Config.cloud_warm_up_force_all_partitions) {
tablets = getAllTablets(srcClusterName, dstClusterName);
} else {
tablets = getHotTablets(srcClusterName, dstClusterName);
}
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(dstClusterName);
Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
for (Backend backend : backends) {
Set<Long> beTabletIds = ((CloudEnv) Env.getCurrentEnv())
.getCloudTabletRebalancer()
.getSnapshotTabletsInPrimaryByBeId(backend.getId());
List<Tablet> warmUpTablets = new ArrayList<>();
for (Tablet tablet : tablets) {
if (beTabletIds.contains(tablet.getId())) {
warmUpTablets.add(tablet);
}
}
beToWarmUpTablets.put(backend.getId(), warmUpTablets);
}
return beToWarmUpTablets;
}
public List<List<String>> getSingleJobInfo(long jobId) throws AnalysisException {
List<List<String>> infos = new ArrayList<List<String>>();
CloudWarmUpJob job = cloudWarmUpJobs.get(jobId);
if (job == null) {
throw new AnalysisException("cloud warm up with job " + jobId + " does not exist");
}
Map<Long, JobWarmUpStats> statsMap = collectAndAggregate();
infos.add(job.getJobInfo(statsMap.get(jobId), true));
return infos;
}
private class JobDaemon extends MasterDaemon {
JobDaemon() {
super("JobDaemon", Config.cloud_warm_up_job_scheduler_interval_millisecond);
LOG.info("start cloud warm up job daemon");
}
@Override
public void runAfterCatalogReady() {
if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_CLOUD_WARM_UP_JOB) {
clearFinishedOrCancelCloudWarmUpJob();
cycleCount = 0;
}
++cycleCount;
runCloudWarmUpJob();
}
}
private class TableFilterRefreshDaemon extends MasterDaemon {
TableFilterRefreshDaemon() {
super("TableFilterRefreshDaemon", Config.cloud_warm_up_table_filter_refresh_interval_ms);
LOG.info("start table filter refresh daemon, interval={}ms",
Config.cloud_warm_up_table_filter_refresh_interval_ms);
}
@Override
public void runAfterCatalogReady() {
if (getInterval() != Config.cloud_warm_up_table_filter_refresh_interval_ms) {
setInterval(Config.cloud_warm_up_table_filter_refresh_interval_ms);
LOG.info("update table filter refresh daemon interval to {}ms", getInterval());
}
refreshAllTableFilters();
}
}
private class WarmUpSyncStatsRefreshDaemon extends MasterDaemon {
WarmUpSyncStatsRefreshDaemon() {
super("WarmUpSyncStatsRefreshDaemon", Config.cloud_warm_up_sync_stats_refresh_interval_ms);
LOG.info("start warm up sync stats refresh daemon, interval={}ms",
Config.cloud_warm_up_sync_stats_refresh_interval_ms);
}
@Override
public void runAfterCatalogReady() {
if (getInterval() != Config.cloud_warm_up_sync_stats_refresh_interval_ms) {
setInterval(Config.cloud_warm_up_sync_stats_refresh_interval_ms);
LOG.info("update warm up sync stats refresh daemon interval to {}ms", getInterval());
}
refreshWarmUpSyncStats();
}
}
/**
* Collect warmup stats from all BEs on demand and aggregate per-job.
* Called when SHOW WARM UP JOB is executed.
*
* @return per-job aggregated warmup stats; empty map if no event-driven jobs exist
*/
private Map<Long, JobWarmUpStats> collectAndAggregate() {
Map<Long, JobWarmUpStats> result = new HashMap<>();
// 1. Collect all clusters involved in event-driven jobs
Set<String> allClusters = new HashSet<>();
for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
if (job.isEventDriven()) {
allClusters.add(job.getSrcClusterName());
allClusters.add(job.getDstClusterName());
}
}
if (allClusters.isEmpty()) {
return result;
}
// 2. Enumerate all (cluster, BE) pairs
List<Pair<String, Backend>> allTargets = new ArrayList<>();
for (String cluster : allClusters) {
for (Backend be : getBackendsFromCluster(cluster)) {
if (be.isAlive()) {
allTargets.add(Pair.of(cluster, be));
}
}
}
if (allTargets.isEmpty()) {
return result;
}
// 3. Concurrent HTTP requests to all BEs
ExecutorCompletionService<Pair<String, String>> completionService =
new ExecutorCompletionService<>(warmupStatsExecutor);
// Acquire auth token once for all BE requests (needed when enable_all_http_auth is on)
Map<String, String> authHeaders = new HashMap<>();
try {
String token = Env.getCurrentEnv().getTokenManager().acquireToken();
authHeaders.put("Auth-Token", token);
} catch (Exception e) {
LOG.warn("Failed to acquire auth token for warmup stats collection, "
+ "requests may fail if enable_all_http_auth is enabled: {}", e.getMessage());
}
for (Pair<String, Backend> target : allTargets) {
String cluster = target.first;
Backend be = target.second;
completionService.submit(() -> {
String url = "http://"
+ NetUtils.getHostPortInAccessibleFormat(be.getHost(), be.getHttpPort())
+ "/api/warmup_event_driven_stats";
String json = HttpUtils.doGet(url, authHeaders, 5000);
return Pair.of(cluster, json);
});
}
// 4. Collect results and merge by cluster ��� jobId
Map<String, Map<Long, TableWarmUpWindowedStats>> clusterStats = new HashMap<>();
for (int i = 0; i < allTargets.size(); i++) {
try {
Future<Pair<String, String>> future = completionService.take();
Pair<String, String> resultPair = future.get(10, TimeUnit.SECONDS);
String cluster = resultPair.first;
String json = resultPair.second;
Map<Long, TableWarmUpWindowedStats> jobMap =
clusterStats.computeIfAbsent(cluster, k -> new HashMap<>());
mergeStatsFromJson(jobMap, json);
} catch (Exception e) {
LOG.warn("Failed to collect warmup stats: {}", e.getMessage());
}
}
// 5. Aggregate per-job
for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
if (!job.isEventDriven()) {
continue;
}
JobWarmUpStats stats = aggregateStatsForJob(job, clusterStats);
result.put(job.getJobId(), stats);
}
return result;
}
/**
* Parse BE JSON response and merge into jobMap.
* JSON structure: data[].{job_id, requested, finish, fail, ...}
*/
private void mergeStatsFromJson(
Map<Long, TableWarmUpWindowedStats> jobMap, String json) {
try {
JsonObject root = JsonParser.parseString(json).getAsJsonObject();
JsonArray data = root.getAsJsonArray("data");
if (data == null) {
return;
}
for (JsonElement jobElem : data) {
JsonObject jobObj = jobElem.getAsJsonObject();
long jobId = jobObj.get("job_id").getAsLong();
TableWarmUpWindowedStats stats = TableWarmUpWindowedStats.fromJson(jobObj);
jobMap.compute(jobId, (id, existing) -> {
if (existing == null) {
return stats;
}
existing.merge(stats);
return existing;
});
}
} catch (Exception e) {
LOG.warn("Failed to parse warmup stats JSON: {}", e.getMessage());
}
}
/**
* Aggregate per-job stats: from srcCluster take requested, from dstCluster take finished.
*/
@VisibleForTesting
JobWarmUpStats aggregateStatsForJob(
CloudWarmUpJob job,
Map<String, Map<Long, TableWarmUpWindowedStats>> clusterStats) {
JobWarmUpStats result = new JobWarmUpStats();
long jobId = job.getJobId();
String srcCluster = job.getSrcClusterName();
String dstCluster = job.getDstClusterName();
TableWarmUpWindowedStats srcStat = clusterStats
.getOrDefault(srcCluster, Collections.emptyMap())
.get(jobId);
TableWarmUpWindowedStats dstStat = clusterStats
.getOrDefault(dstCluster, Collections.emptyMap())
.get(jobId);
if (srcStat != null) {
result.mergeRequested(srcStat);
}
if (dstStat != null) {
// Target-side progress timestamp is a watermark, not an additive counter. The merge
// keeps the minimum positive watermark across BEs so FE reports the slowest target
// progress for trigger-gap calculation.
result.mergeFinished(dstStat);
}
result.computeGap();
return result;
}
private void clearFinishedOrCancelCloudWarmUpJob() {
Iterator<Entry<Long, CloudWarmUpJob>> iterator = runnableCloudWarmUpJobs.entrySet().iterator();
while (iterator.hasNext()) {
CloudWarmUpJob cloudWarmUpJob = iterator.next().getValue();
if (cloudWarmUpJob.isDone()) {
iterator.remove();
}
}
Iterator<Map.Entry<Long, CloudWarmUpJob>> iterator2 = cloudWarmUpJobs.entrySet().iterator();
while (iterator2.hasNext()) {
CloudWarmUpJob cloudWarmUpJob = iterator2.next().getValue();
if (cloudWarmUpJob.isExpire()) {
cloudWarmUpJob.setJobState(JobState.DELETED);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(cloudWarmUpJob);
iterator2.remove();
LOG.info("remove expired cloud warm up job {}. finish at {}",
cloudWarmUpJob.getJobId(), TimeUtils.longToTimeString(cloudWarmUpJob.getFinishedTimeMs()));
}
}
}
public Map<Long, CloudWarmUpJob> getCloudWarmUpJobs() {
return this.cloudWarmUpJobs;
}
public CloudWarmUpJob getCloudWarmUpJob(long jobId) {
return this.cloudWarmUpJobs.get(jobId);
}
public List<List<String>> getAllJobInfos(int limit) {
Map<Long, JobWarmUpStats> statsMap = collectAndAggregate();
List<List<String>> infos = Lists.newArrayList();
Collection<CloudWarmUpJob> allJobs = cloudWarmUpJobs.values();
allJobs.stream().sorted(Comparator.comparing(CloudWarmUpJob::getCreateTimeMs).reversed())
.limit(limit).forEach(t -> {
infos.add(t.getJobInfo(statsMap.get(t.getJobId()), false));
});
return infos;
}
public void addCloudWarmUpJob(CloudWarmUpJob job) throws AnalysisException {
restoreTableFilterState(job);
registerJobForRepeatDetection(job, false);
cloudWarmUpJobs.put(job.getJobId(), job);
LOG.info("add cloud warm up job {}", job.getJobId());
runnableCloudWarmUpJobs.put(job.getJobId(), job);
}
private void restoreTableFilterState(CloudWarmUpJob job) {
if (!job.hasTableFilter()) {
return;
}
job.rebuildOnTablesFilter();
Map<Long, String> tableIdNames = resolveTableIds(job.getOnTablesFilter());
job.setCurrentTableIdNames(tableIdNames);
logMatchedTables("restored table filter for job " + job.getJobId(), tableIdNames);
}
public List<Partition> getPartitionsFromTriple(Triple<String, String, String> tableTriple) {
String dbName = tableTriple.getLeft();
String tableName = tableTriple.getMiddle();
String partitionName = tableTriple.getRight();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
OlapTable table = (OlapTable) db.getTableNullable(tableName);
List<Partition> partitions = new ArrayList<>();
if (partitionName.length() != 0) {
partitions.add(table.getPartition(partitionName));
} else {
partitions.addAll(table.getPartitions());
}
return partitions;
}
public List<Backend> getBackendsFromCluster(String dstClusterName) {
return ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(dstClusterName);
}
public Set<Long> getTabletIdsFromBe(long beId) {
return ((CloudEnv) Env.getCurrentEnv())
.getCloudTabletRebalancer()
.getSnapshotTabletsInPrimaryByBeId(beId);
}
public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> indexes) {
List<Tablet> tablets = new ArrayList<>();
for (MaterializedIndex index : indexes) {
tablets.addAll(index.getTablets());
}
return tablets;
}
public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, String dstClusterName,
List<Triple<String, String, String>> tables,
boolean isForce) throws RuntimeException {
Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
Long totalFileCache = getFileCacheCapacity(dstClusterName);
Long warmUpTotalFileCache = 0L;
LOG.info("Start warm up job {}, cluster {}, total cache size: {}",
jobId, dstClusterName, totalFileCache);
List<Backend> backends = getBackendsFromCluster(dstClusterName);
LOG.info("Got {} backends for cluster {}", backends.size(), dstClusterName);
Map<Long, Set<Long>> beToTabletIds = new HashMap<>();
for (Backend backend : backends) {
beToTabletIds.put(backend.getId(), getTabletIdsFromBe(backend.getId()));
}
for (Triple<String, String, String> tableTriple : tables) {
if (warmUpTotalFileCache > totalFileCache) {
LOG.info("Warm up size {} exceeds total cache size {}, breaking loop",
warmUpTotalFileCache, totalFileCache);
break;
}
List<Partition> partitions = getPartitionsFromTriple(tableTriple);
LOG.info("Got {} partitions for table {}.{}.{}", partitions.size(),
tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
List<Partition> warmUpPartitions = new ArrayList<>();
for (Partition partition : partitions) {
Long partitionSize = partition.getDataSize(true);
warmUpTotalFileCache += partitionSize;
warmUpPartitions.add(partition);
if (warmUpTotalFileCache > totalFileCache) {
LOG.info("Warm up size {} exceeds total cache size {}, current partition size {}",
warmUpTotalFileCache, totalFileCache, partitionSize);
break;
}
}
List<MaterializedIndex> indexes = new ArrayList<>();
for (Partition partition : warmUpPartitions) {
indexes.addAll(partition.getMaterializedIndices(IndexExtState.VISIBLE));
}
LOG.info("Got {} materialized indexes for table {}.{}.{}", indexes.size(),
tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
List<Tablet> tablets = getTabletsFromIndexs(indexes);
LOG.info("Got {} tablets for table {}.{}.{}", tablets.size(),
tableTriple.getLeft(), tableTriple.getMiddle(), tableTriple.getRight());
for (Backend backend : backends) {
Set<Long> beTabletIds = beToTabletIds.get(backend.getId());
List<Tablet> warmUpTablets = new ArrayList<>();
for (Tablet tablet : tablets) {
if (beTabletIds.contains(tablet.getId())) {
warmUpTablets.add(tablet);
}
}
LOG.info("Assigning {} tablets to backend {}", warmUpTablets.size(), backend.getId());
beToWarmUpTablets.computeIfAbsent(backend.getId(),
k -> new ArrayList<>()).addAll(warmUpTablets);
}
}
LOG.info("The job {} warm up size is {}, the cluster cache size is {}",
jobId, warmUpTotalFileCache, totalFileCache);
if (warmUpTotalFileCache > totalFileCache && !isForce) {
throw new RuntimeException("The cluster " + dstClusterName + " cache size is not enough");
}
return beToWarmUpTablets;
}
public long createJob(WarmUpClusterCommand stmt) throws AnalysisException {
long jobId = Env.getCurrentEnv().getNextId();
CloudWarmUpJob warmUpJob;
if (stmt.isWarmUpWithTable()) {
Map<Long, List<Tablet>> beToWarmUpTablets = new HashMap<>();
if (!FeConstants.runningUnitTest) {
beToWarmUpTablets = warmUpNewClusterByTable(jobId, stmt.getDstCluster(), stmt.getTables(),
stmt.isForce());
}
Map<Long, List<List<Long>>> beToTabletIdBatches = splitBatch(beToWarmUpTablets);
warmUpJob = new CloudWarmUpJob(jobId, stmt.getDstCluster(),
beToTabletIdBatches, JobType.TABLE, stmt.getTables(), stmt.isForce());
} else {
CloudWarmUpJob.Builder builder = new CloudWarmUpJob.Builder()
.setJobId(jobId)
.setSrcClusterName(stmt.getSrcCluster())
.setDstClusterName(stmt.getDstCluster())
.setJobType(JobType.CLUSTER);
Map<String, String> properties = stmt.getProperties();
if ("periodic".equals(properties.get("sync_mode"))) {
String syncIntervalSecStr = properties.get("sync_interval_sec");
if (syncIntervalSecStr == null) {
throw new AnalysisException("No sync_interval_sec is provided");
}
long syncIntervalSec;
try {
syncIntervalSec = Long.parseLong(syncIntervalSecStr);
} catch (NumberFormatException e) {
throw new AnalysisException("Illegal sync_interval_sec: " + syncIntervalSecStr);
}
builder.setSyncMode(SyncMode.PERIODIC)
.setSyncInterval(syncIntervalSec);
} else if ("event_driven".equals(properties.get("sync_mode"))) {
String syncEventStr = properties.get("sync_event");
if (syncEventStr == null) {
throw new AnalysisException("No sync_event is provided");
}
CloudWarmUpJob.SyncEvent syncEvent;
try {
syncEvent = CloudWarmUpJob.SyncEvent.valueOf(syncEventStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new AnalysisException("Illegal sync_event: " + syncEventStr, e);
}
builder.setSyncMode(SyncMode.EVENT_DRIVEN)
.setSyncEvent(syncEvent);
// Handle ON TABLES rules
List<OnTablesFilter.TableFilterRule> onTablesRules = stmt.getOnTablesRules();
if (onTablesRules != null && !onTablesRules.isEmpty()) {
builder.setJobType(JobType.TABLES);
List<CloudWarmUpJob.PersistedTableFilterRule> persistedRules = new ArrayList<>();
for (OnTablesFilter.TableFilterRule rule : onTablesRules) {
CloudWarmUpJob.PersistedTableFilterRule pr = new CloudWarmUpJob.PersistedTableFilterRule();
pr.ruleType = rule.getRuleType().name();
pr.pattern = rule.getRawPattern();
persistedRules.add(pr);
}
builder.setTableFilterRules(persistedRules);
}
} else {
builder.setSyncMode(SyncMode.ONCE);
}
warmUpJob = builder.build();
// For event-driven jobs with ON TABLES, rebuild filter and resolve initial table IDs
if (warmUpJob.hasTableFilter()) {
warmUpJob.rebuildOnTablesFilter();
Map<Long, String> initialTableIdNames = resolveTableIds(warmUpJob.getOnTablesFilter());
logMatchedTables("created table filter for job " + jobId, initialTableIdNames);
if (initialTableIdNames.isEmpty()) {
throw new AnalysisException("No tables matched the ON TABLES filter");
}
warmUpJob.setCurrentTableIdNames(initialTableIdNames);
}
}
addCloudWarmUpJob(warmUpJob);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob);
LOG.info("finished to create cloud warm up job: {}", warmUpJob.getJobId());
return jobId;
}
public void cancel(CancelWarmUpJobCommand stmt) throws DdlException {
cancel(stmt.getJobId());
}
public void cancel(long jobId) throws DdlException {
cancel(jobId, "user cancel");
}
public void cancel(long jobId, String msg) throws DdlException {
CloudWarmUpJob job = cloudWarmUpJobs.get(jobId);
if (job == null) {
throw new DdlException("job id: " + jobId + " does not exist.");
}
if (!job.cancel(msg, true)) {
throw new DdlException("job can not be cancelled. State: " + job.getJobState());
}
}
public void cancelTableFilterJobsForClusterChange(String clusterName, String reason) {
for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
if (job.isDone() || !job.hasTableFilter()) {
continue;
}
if (!Objects.equals(clusterName, job.getSrcClusterName())
&& !Objects.equals(clusterName, job.getDstClusterName())) {
continue;
}
try {
cancel(job.getJobId(), reason);
LOG.info("cancel table-level cloud warm up job {} because compute group {} changed: {}",
job.getJobId(), clusterName, reason);
} catch (DdlException e) {
LOG.warn("failed to cancel table-level cloud warm up job {} after compute group {} changed",
job.getJobId(), clusterName, e);
}
}
}
private void runCloudWarmUpJob() {
runnableCloudWarmUpJobs.values().forEach(cloudWarmUpJob -> {
if (cloudWarmUpJob.shouldWait()) {
return;
}
if (!cloudWarmUpJob.isDone() && !activeCloudWarmUpJobs.containsKey(cloudWarmUpJob.getJobId())
&& activeCloudWarmUpJobs.size() < Config.max_active_cloud_warm_up_job) {
if (FeConstants.runningUnitTest) {
cloudWarmUpJob.run();
} else {
cloudWarmUpThreadPool.submit(() -> {
if (activeCloudWarmUpJobs.putIfAbsent(cloudWarmUpJob.getJobId(), cloudWarmUpJob) == null) {
try {
cloudWarmUpJob.run();
} finally {
activeCloudWarmUpJobs.remove(cloudWarmUpJob.getJobId());
}
}
});
}
}
});
}
public void replayCloudWarmUpJob(CloudWarmUpJob cloudWarmUpJob) throws Exception {
// ATTN: not need to replay, just override the job with the same job id.
runnableCloudWarmUpJobs.put(cloudWarmUpJob.getJobId(), cloudWarmUpJob);
cloudWarmUpJobs.put(cloudWarmUpJob.getJobId(), cloudWarmUpJob);
LOG.info("replay cloud warm up job {}, state {}", cloudWarmUpJob.getJobId(), cloudWarmUpJob.getJobState());
restoreTableFilterState(cloudWarmUpJob);
if (cloudWarmUpJob.isDone()) {
notifyJobStop(cloudWarmUpJob);
} else {
registerJobForRepeatDetection(cloudWarmUpJob, true);
}
if (cloudWarmUpJob.jobState == JobState.DELETED) {
if (cloudWarmUpJobs.remove(cloudWarmUpJob.getJobId()) != null
&& runnableCloudWarmUpJobs.remove(cloudWarmUpJob.getJobId()) != null) {
LOG.info("replay removing expired cloud warm up job {}.", cloudWarmUpJob.getJobId());
} else {
// should not happen, but it does no matter, just add a warn log here to observe
LOG.warn("failed to find cloud warm up job {} when replay removing expired job.",
cloudWarmUpJob.getJobId());
}
}
}
/**
* Resolve glob-based ON TABLES filter to a map of matching table ID ��� "db.table" name
* by iterating all databases and tables in the internal catalog.
*/
public Map<Long, String> resolveTableIds(OnTablesFilter filter) {
Map<Long, String> result = new HashMap<>();
if (filter == null) {
return result;
}
Collection<DatabaseIf<? extends TableIf>> allDbs =
Env.getCurrentInternalCatalog().getAllDbs();
for (DatabaseIf<? extends TableIf> dbIf : allDbs) {
String dbName = dbIf.getFullName();
// Strip "default_cluster:" prefix if present
if (dbName.contains(":")) {
dbName = dbName.substring(dbName.indexOf(':') + 1);
}
Set<String> tableNames = dbIf.getTableNamesOrEmptyWithLock();
for (String tableName : tableNames) {
TableIf table = dbIf.getTableNullable(tableName);
if (table != null && table.isManagedTable() && filter.shouldWarmUp(dbName, tableName)) {
result.put(table.getId(), dbName + "." + tableName);
}
}
}
return result;
}
private void logMatchedTables(String action, Map<Long, String> tableIdNames) {
String matchedTables = CloudWarmUpJob.formatMatchedTablesForDisplay(tableIdNames.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(entry -> entry.getKey() + ":" + entry.getValue())
.collect(Collectors.toList()));
LOG.info("{}: matched_table_count={}, matched_tables=[{}]",
action, tableIdNames.size(), matchedTables);
}
/**
* Periodically refresh table IDs for all running event-driven jobs with ON TABLES filter.
* Called from the daemon loop to pick up newly created/dropped tables matching glob patterns.
*/
public void refreshAllTableFilters() {
for (CloudWarmUpJob job : runnableCloudWarmUpJobs.values()) {
if (job.isDone() || !job.isEventDriven() || !job.hasTableFilter()) {
continue;
}
try {
Map<Long, String> newTableIdNames = resolveTableIds(job.getOnTablesFilter());
logMatchedTables("refreshed table filter for job " + job.getJobId(), newTableIdNames);
Set<Long> oldTableIds = job.getCurrentTableIds();
if (!newTableIdNames.equals(job.getCurrentTableIdNames())) {
job.setCurrentTableIdNames(newTableIdNames);
LOG.info("refreshed table filter for job {}: {} -> {} tables",
job.getJobId(),
oldTableIds == null ? 0 : oldTableIds.size(),
newTableIdNames.size());
}
} catch (Exception e) {
LOG.warn("failed to refresh table filter for job {}", job.getJobId(), e);
}
}
}
}