AbstractJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.base;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Data
@Log4j2
public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
public static final ImmutableList<Column> COMMON_SCHEMA = ImmutableList.of(
new Column("SucceedTaskCount", ScalarType.createStringType()),
new Column("FailedTaskCount", ScalarType.createStringType()),
new Column("CanceledTaskCount", ScalarType.createStringType())
);
@SerializedName(value = "jid")
private Long jobId;
@SerializedName(value = "jn")
private String jobName;
@SerializedName(value = "js")
private JobStatus jobStatus;
@SerializedName(value = "cdb")
private String currentDbName;
@SerializedName(value = "c")
private String comment;
@SerializedName(value = "cu")
private UserIdentity createUser;
@SerializedName(value = "jc")
private JobExecutionConfiguration jobConfig;
@SerializedName(value = "ctms")
private long createTimeMs;
@SerializedName(value = "stm")
private long startTimeMs = -1L;
@SerializedName(value = "ftm")
private long finishTimeMs;
@SerializedName(value = "sql")
String executeSql;
@SerializedName(value = "stc")
private AtomicLong succeedTaskCount = new AtomicLong(0);
@SerializedName(value = "ftc")
private AtomicLong failedTaskCount = new AtomicLong(0);
@SerializedName(value = "ctc")
private AtomicLong canceledTaskCount = new AtomicLong(0);
public AbstractJob() {
}
public AbstractJob(Long id) {
jobId = id;
}
/**
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null);
}
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig,
Long createTimeMs,
String executeSql) {
this.jobId = jobId;
this.jobName = jobName;
this.jobStatus = jobStatus;
this.currentDbName = currentDbName;
this.comment = comment;
this.createUser = createUser;
this.jobConfig = jobConfig;
this.createTimeMs = createTimeMs;
this.executeSql = executeSql;
}
private CopyOnWriteArrayList<T> runningTasks = new CopyOnWriteArrayList<>();
private Lock createTaskLock = new ReentrantLock();
@Override
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}
for (T task : runningTasks) {
task.cancel(needWaitCancelComplete);
canceledTaskCount.incrementAndGet();
}
runningTasks = new CopyOnWriteArrayList<>();
logUpdateOperation();
}
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("Id")
.add("Name")
.add("Definer")
.add("ExecuteType")
.add("RecurringStrategy")
.add("Status")
.add("ExecuteSql")
.add("CreateTime")
.add("Comment")
.build();
protected static long getNextJobId() {
return System.nanoTime() + RandomUtils.nextInt();
}
@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(true);
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
canceledTaskCount.incrementAndGet();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
}
/**
* for show command to display all tasks of this job.
*/
public List<T> queryAllTasks() {
List<T> tasks = new ArrayList<>();
if (CollectionUtils.isEmpty(runningTasks)) {
return queryTasks();
}
List<T> historyTasks = queryTasks();
if (CollectionUtils.isNotEmpty(historyTasks)) {
tasks.addAll(historyTasks);
}
Set<Long> loadTaskIds = tasks.stream().map(AbstractTask::getTaskId).collect(Collectors.toSet());
runningTasks.forEach(task -> {
if (!loadTaskIds.contains(task.getTaskId())) {
tasks.add(task);
}
});
Comparator<T> taskComparator = Comparator.comparingLong(T::getCreateTimeMs).reversed();
tasks.sort(taskComparator);
return tasks;
}
public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
if (!canCreateTask(taskType)) {
log.info("job is not ready for scheduling, job id is {},job status is {}, taskType is {}", jobId,
jobStatus, taskType);
return new ArrayList<>();
}
if (!isReadyForScheduling(taskContext)) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
try {
//it's better to use tryLock and add timeout limit
createTaskLock.lock();
if (!isReadyForScheduling(taskContext)) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
List<T> tasks = createTasks(taskType, taskContext);
tasks.forEach(task -> log.info("common create task, job id is {}, task id is {}", jobId, task.getTaskId()));
return tasks;
} finally {
createTaskLock.unlock();
}
}
private boolean canCreateTask(TaskType taskType) {
JobStatus currentJobStatus = getJobStatus();
switch (taskType) {
case SCHEDULED:
return currentJobStatus.equals(JobStatus.RUNNING);
case MANUAL:
return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED);
default:
throw new IllegalArgumentException("Unsupported TaskType: " + taskType);
}
}
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
tasks.forEach(task -> {
task.setTaskType(taskType);
task.setJobId(getJobId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
getRunningTasks().addAll(tasks);
this.startTimeMs = System.currentTimeMillis();
}
/**
* Some of the logic does not satisfy idempotency—each job can only be called once.
*/
public void initParams() {
if (jobConfig != null) {
jobConfig.initParams();
}
}
public void checkJobParams() {
if (null == jobId) {
throw new IllegalArgumentException("jobId cannot be null");
}
if (null == jobConfig) {
throw new IllegalArgumentException("jobConfig cannot be null");
}
jobConfig.checkParams();
checkJobParamsInternal();
}
public void updateJobStatus(JobStatus newJobStatus) throws JobException {
if (null == newJobStatus) {
throw new IllegalArgumentException("jobStatus cannot be null");
}
if (jobStatus == newJobStatus) {
return;
}
String errorMsg = String.format("Can't update job %s status to the %s status",
jobStatus.name(), newJobStatus.name());
if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) {
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) {
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.FINISHED)) {
this.finishTimeMs = System.currentTimeMillis();
}
if (JobStatus.PAUSED.equals(newJobStatus) || JobStatus.STOPPED.equals(newJobStatus)) {
cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false : true);
}
jobStatus = newJobStatus;
}
protected abstract void checkJobParamsInternal();
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.runningTasks = new CopyOnWriteArrayList();
job.createTaskLock = new ReentrantLock();
return job;
}
public void logCreateOperation() {
Env.getCurrentEnv().getEditLog().logCreateJob(this);
}
public void logDeleteOperation() {
Env.getCurrentEnv().getEditLog().logDeleteJob(this);
}
public void logUpdateOperation() {
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@Override
public void onTaskFail(T task) throws JobException {
failedTaskCount.incrementAndGet();
updateJobStatusIfEnd(false, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}
@Override
public void onTaskSuccess(T task) throws JobException {
succeedTaskCount.incrementAndGet();
updateJobStatusIfEnd(true, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}
private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType) throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL) || taskType.equals(TaskType.MANUAL)) {
return;
}
switch (executeType) {
case ONE_TIME:
updateJobStatus(JobStatus.FINISHED);
this.finishTimeMs = System.currentTimeMillis();
break;
case INSTANT:
this.finishTimeMs = System.currentTimeMillis();
if (taskSuccess) {
updateJobStatus(JobStatus.FINISHED);
} else {
updateJobStatus(JobStatus.STOPPED);
}
break;
case RECURRING:
TimerDefinition timerDefinition = getJobConfig().getTimerDefinition();
if (null != timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < System.currentTimeMillis()
+ timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) {
this.finishTimeMs = System.currentTimeMillis();
updateJobStatus(JobStatus.FINISHED);
}
break;
default:
break;
}
}
/**
* get the job's common show info, which is used to show the job information
* eg:show jobs sql
*
* @return List<String> job common show info
*/
public List<String> getCommonShowInfo() {
List<String> commonShowInfo = new ArrayList<>();
commonShowInfo.add(String.valueOf(jobId));
commonShowInfo.add(jobName);
commonShowInfo.add(createUser.getQualifiedUser());
commonShowInfo.add(jobConfig.getExecuteType().name());
commonShowInfo.add(jobConfig.convertRecurringStrategyToString());
commonShowInfo.add(jobStatus.name());
commonShowInfo.add(executeSql);
commonShowInfo.add(TimeUtils.longToTimeString(createTimeMs));
commonShowInfo.add(comment);
return commonShowInfo;
}
public TRow getCommonTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobId)));
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(createUser.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(jobConfig.getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(jobConfig.convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(jobStatus.name()));
trow.addToColumnValue(new TCell().setStringVal(executeSql));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs)));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(succeedTaskCount.get())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(failedTaskCount.get())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(canceledTaskCount.get())));
trow.addToColumnValue(new TCell().setStringVal(comment));
return trow;
}
@Override
public List<String> getShowInfo() {
return getCommonShowInfo();
}
@Override
public TRow getTvfInfo() {
return getCommonTvfInfo();
}
/**
* Generates a common error message when the execution queue is full.
*
* @param taskId The ID of the task.
* @param queueConfigName The name of the queue configuration.
* @param executeThreadConfigName The name of the execution thread configuration.
* @return A formatted error message.
*/
protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName,
String executeThreadConfigName) {
return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, "
+ "you can increase the queue size by setting the property "
+ "%s in the fe.conf file or increase the value of "
+ "the property %s in the fe.conf file", getJobId(), getJobName(), taskId, queueConfigName,
executeThreadConfigName);
}
@Override
public ShowResultSetMetaData getJobMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
}
@Override
public void onRegister() throws JobException {
}
@Override
public void onUnRegister() throws JobException {
}
@Override
public void onReplayCreate() throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay create scheduler job").build());
}
@Override
public void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException {
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", "replay delete scheduler job").build());
}
public boolean needPersist() {
return true;
}
}