CloudWarmUpJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService.Client;
import org.apache.doris.thrift.TDownloadType;
import org.apache.doris.thrift.TJobMeta;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpTabletsRequest;
import org.apache.doris.thrift.TWarmUpTabletsRequestType;
import org.apache.doris.thrift.TWarmUpTabletsResponse;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CloudWarmUpJob implements Writable {
private static final Logger LOG = LogManager.getLogger(CloudWarmUpJob.class);
public enum JobState {
PENDING,
RUNNING,
FINISHED,
CANCELLED,
DELETED;
public boolean isFinalState() {
return this == JobState.FINISHED || this == JobState.CANCELLED || this == JobState.DELETED;
}
}
public enum JobType {
CLUSTER,
TABLE;
}
@SerializedName(value = "jobId")
protected long jobId;
@SerializedName(value = "jobState")
protected JobState jobState;
@SerializedName(value = "createTimeMs")
protected long createTimeMs = -1;
@SerializedName(value = "errMsg")
protected String errMsg = "";
@SerializedName(value = "finishedTimeMs")
protected long finishedTimeMs = -1;
@SerializedName(value = "cloudClusterName")
protected String cloudClusterName = "";
@SerializedName(value = "lastBatchId")
protected long lastBatchId = -1;
@SerializedName(value = "beToTabletIdBatches")
protected Map<Long, List<List<Long>>> beToTabletIdBatches;
@SerializedName(value = "beToThriftAddress")
protected Map<Long, String> beToThriftAddress = new HashMap<>();
@SerializedName(value = "JobType")
protected JobType jobType;
private Map<Long, Client> beToClient;
private Map<Long, TNetworkAddress> beToAddr;
private int maxRetryTime = 3;
private int retryTime = 0;
private boolean retry = false;
private boolean setJobDone = false;
public CloudWarmUpJob(long jobId, String cloudClusterName,
Map<Long, List<List<Long>>> beToTabletIdBatches, JobType jobType) {
this.jobId = jobId;
this.jobState = JobState.PENDING;
this.cloudClusterName = cloudClusterName;
this.beToTabletIdBatches = beToTabletIdBatches;
this.createTimeMs = System.currentTimeMillis();
this.jobType = jobType;
if (!FeConstants.runningUnitTest) {
List<Backend> backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getBackendsByClusterName(cloudClusterName);
for (Backend backend : backends) {
beToThriftAddress.put(backend.getId(), backend.getHost() + ":" + backend.getBePort());
}
}
}
public long getJobId() {
return jobId;
}
public JobState getJobState() {
return jobState;
}
public long getCreateTimeMs() {
return createTimeMs;
}
public String getErrMsg() {
return errMsg;
}
public long getFinishedTimeMs() {
return finishedTimeMs;
}
public long getLastBatchId() {
return lastBatchId;
}
public Map<Long, List<List<Long>>> getBeToTabletIdBatches() {
return beToTabletIdBatches;
}
public Map<Long, String> getBeToThriftAddress() {
return beToThriftAddress;
}
public JobType getJobType() {
return jobType;
}
public List<String> getJobInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
info.add(cloudClusterName);
info.add(jobState.name());
info.add(jobType.name());
info.add(TimeUtils.longToTimeStringWithms(createTimeMs));
info.add(Long.toString(lastBatchId + 1));
long maxBatchSize = 0;
for (List<List<Long>> list : beToTabletIdBatches.values()) {
long size = list.size();
if (size > maxBatchSize) {
maxBatchSize = size;
}
}
info.add(Long.toString(maxBatchSize));
info.add(TimeUtils.longToTimeStringWithms(finishedTimeMs));
info.add(errMsg);
return info;
}
public void setJobState(JobState jobState) {
this.jobState = jobState;
}
public void setCreateTimeMs(long timeMs) {
this.createTimeMs = timeMs;
}
public void setErrMsg(String msg) {
this.errMsg = msg;
}
public void setFinishedTimeMs(long timeMs) {
this.finishedTimeMs = timeMs;
}
public void setCloudClusterName(String name) {
this.cloudClusterName = name;
}
public void setLastBatchId(long id) {
this.lastBatchId = id;
}
public void setBeToTabletIdBatches(Map<Long, List<List<Long>>> m) {
this.beToTabletIdBatches = m;
}
public void setBeToThriftAddress(Map<Long, String> m) {
this.beToThriftAddress = m;
}
public void setJobType(JobType t) {
this.jobType = t;
}
public boolean isDone() {
return jobState.isFinalState();
}
public boolean isTimeout() {
return (System.currentTimeMillis() - createTimeMs) / 1000 > Config.cloud_warm_up_timeout_second;
}
public boolean isExpire() {
return isDone() && (System.currentTimeMillis() - finishedTimeMs) / 1000
> Config.history_cloud_warm_up_job_keep_max_second;
}
public String getCloudClusterName() {
return cloudClusterName;
}
public synchronized void run() {
if (isTimeout()) {
cancel("Timeout");
return;
}
if (Config.isCloudMode()) {
LOG.debug("set context to job");
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
ctx.setCloudCluster(cloudClusterName);
}
try {
switch (jobState) {
case PENDING:
runPendingJob();
break;
case RUNNING:
runRunningJob();
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
LOG.warn("state {} exception {}", jobState, e.getMessage());
} finally {
if (Config.isCloudMode()) {
LOG.debug("remove context from job");
ConnectContext.remove();
}
}
}
public void initClients() throws Exception {
if (beToClient == null) {
beToClient = new HashMap<>();
beToAddr = new HashMap<>();
}
if (beToClient.isEmpty()) {
for (Map.Entry<Long, String> entry : beToThriftAddress.entrySet()) {
boolean ok = false;
TNetworkAddress address = null;
Client client = null;
try {
String[] ipPort = entry.getValue().split(":");
address = new TNetworkAddress(ipPort[0], Integer.parseInt(ipPort[1]));
beToAddr.put(entry.getKey(), address);
client = ClientPool.backendPool.borrowObject(address);
beToClient.put(entry.getKey(), client);
ok = true;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (!ok) {
ClientPool.backendPool.invalidateObject(address, client);
releaseClients();
}
}
}
}
}
public void releaseClients() {
if (beToClient != null) {
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
ClientPool.backendPool.returnObject(beToAddr.get(entry.getKey()),
entry.getValue());
}
}
beToClient = null;
beToAddr = null;
}
public final synchronized boolean cancel(String errMsg) {
if (this.jobState.isFinalState()) {
return false;
}
try {
initClients();
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
request.setJobId(jobId);
LOG.info("send warm up request. request_type=CLEAR_JOB");
entry.getValue().warmUpTablets(request);
}
} catch (Exception e) {
LOG.warn("warm up job {} cancel exception: {}", jobId, e.getMessage());
} finally {
releaseClients();
}
this.jobState = JobState.CANCELLED;
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
LOG.info("cancel cloud warm up job {}, err {}", jobId, errMsg);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr().getRunnableClusterSet().remove(this.cloudClusterName);
return true;
}
private void runPendingJob() throws DdlException {
Preconditions.checkState(jobState == JobState.PENDING, jobState);
// Todo: nothing to prepare yet
this.jobState = JobState.RUNNING;
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
LOG.info("transfer cloud warm up job {} state to {}", jobId, this.jobState);
}
private List<TJobMeta> buildJobMetas(long beId, long batchId) {
List<TJobMeta> jobMetas = new ArrayList<>();
List<List<Long>> tabletIdBatches = beToTabletIdBatches.get(beId);
if (batchId < tabletIdBatches.size()) {
List<Long> tabletIds = tabletIdBatches.get((int) batchId);
TJobMeta jobMeta = new TJobMeta();
jobMeta.setDownloadType(TDownloadType.S3);
jobMeta.setTabletIds(tabletIds);
jobMetas.add(jobMeta);
}
return jobMetas;
}
private void runRunningJob() throws Exception {
Preconditions.checkState(jobState == JobState.RUNNING, jobState);
if (FeConstants.runningUnitTest) {
Thread.sleep(1000);
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();
((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr().getRunnableClusterSet().remove(this.cloudClusterName);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
return;
}
boolean changeToCancelState = false;
try {
initClients();
// If there is first batch, send SET_JOB RPC
if (lastBatchId == -1 && !setJobDone) {
setJobDone = true;
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.SET_JOB);
request.setJobId(jobId);
request.setBatchId(lastBatchId + 1);
request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id));
LOG.info("send warm up request. job_id={}, batch_id={}, job_sizes={}, request_type=SET_JOB",
jobId, request.batch_id, request.job_metas.size());
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
errMsg = response.getStatus().getErrorMsgs().get(0);
}
changeToCancelState = true;
}
}
} else {
// Check the batches of all BEs done
boolean allLastBatchDone = true;
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE);
LOG.info("send warm up request. request_type=GET_CURRENT_JOB_STATE_AND_LEASE");
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
errMsg = response.getStatus().getErrorMsgs().get(0);
}
changeToCancelState = true;
}
if (!changeToCancelState && response.pending_job_size != 0) {
allLastBatchDone = false;
break;
}
}
if (!changeToCancelState && allLastBatchDone) {
if (retry) {
// RPC failed, retry
retry = false;
} else {
// last batch is done, log and do next batch
lastBatchId++;
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
}
boolean allBatchesDone = true;
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.SET_BATCH);
request.setJobId(jobId);
request.setBatchId(lastBatchId + 1);
request.setJobMetas(buildJobMetas(entry.getKey(), request.batch_id));
if (!request.job_metas.isEmpty()) {
// check all batches is done or not
allBatchesDone = false;
LOG.info("send warm up request. job_id={}, batch_id={}"
+ "job_sizes={}, request_type=SET_BATCH",
jobId, request.batch_id, request.job_metas.size());
TWarmUpTabletsResponse response = entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
errMsg = response.getStatus().getErrorMsgs().get(0);
}
changeToCancelState = true;
}
}
}
if (allBatchesDone) {
// release job
this.jobState = JobState.FINISHED;
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
request.setJobId(jobId);
LOG.info("send warm up request. request_type=CLEAR_JOB");
entry.getValue().warmUpTablets(request);
}
this.finishedTimeMs = System.currentTimeMillis();
releaseClients();
((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr()
.getRunnableClusterSet().remove(this.cloudClusterName);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
}
}
}
if (changeToCancelState) {
// release job
this.jobState = JobState.CANCELLED;
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
request.setJobId(jobId);
LOG.info("send warm up request. request_type=CLEAR_JOB");
entry.getValue().warmUpTablets(request);
}
this.finishedTimeMs = System.currentTimeMillis();
releaseClients();
((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr()
.getRunnableClusterSet().remove(this.cloudClusterName);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
}
} catch (Exception e) {
retryTime++;
retry = true;
if (retryTime < maxRetryTime) {
LOG.warn("warm up job {} exception: {}", jobId, e.getMessage());
} else {
// retry three times and release job
this.jobState = JobState.CANCELLED;
this.finishedTimeMs = System.currentTimeMillis();
this.errMsg = "retry the warm up job until max retry time " + String.valueOf(maxRetryTime);
((CloudEnv) Env.getCurrentEnv()).getCacheHotspotMgr()
.getRunnableClusterSet().remove(this.cloudClusterName);
Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
}
releaseClients();
}
}
public void replay() throws Exception {
// No need to replay anything yet
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this, CloudWarmUpJob.class);
Text.writeString(out, json);
}
public static CloudWarmUpJob read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, CloudWarmUpJob.class);
}
}