AbstractTask.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.thrift.TUniqueId;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.RandomUtils;
import java.util.UUID;
@Data
@Log4j2
public abstract class AbstractTask implements Task {
@SerializedName(value = "jid")
private Long jobId;
@SerializedName(value = "tid")
private Long taskId;
@SerializedName(value = "st")
private TaskStatus status;
@SerializedName(value = "ctm")
private Long createTimeMs;
@SerializedName(value = "stm")
private Long startTimeMs;
@SerializedName(value = "ftm")
private Long finishTimeMs;
@SerializedName(value = "tt")
private TaskType taskType;
@SerializedName(value = "emg")
private String errMsg;
public AbstractTask() {
taskId = getNextTaskId();
}
private static long getNextTaskId() {
// do not use Env.getNextId(), just generate id without logging
return System.nanoTime() + RandomUtils.nextInt();
}
@Override
public boolean onFail() throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
return false;
}
status = TaskStatus.FAILED;
if (!isCallable()) {
return false;
}
Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this);
return true;
}
@Override
public void onFail(String errMsg) throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
status = TaskStatus.FAILED;
setFinishTimeMs(System.currentTimeMillis());
setErrMsg(errMsg);
if (!isCallable()) {
return;
}
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
job.onTaskFail(this);
}
private boolean isCallable() {
if (status.equals(TaskStatus.CANCELED)) {
return false;
}
if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
return true;
}
return false;
}
/**
* Closes or releases all allocated resources such as database connections, file streams, or any other
* external system handles that were utilized during the task execution. This method is invoked
* unconditionally, ensuring that resources are properly managed whether the task completes
* successfully, fails, or is canceled. It is crucial for preventing resource leaks and maintaining
* the overall health and efficiency of the application.
* <p>
* Note: Implementations of this method should handle potential exceptions internally and log them
* appropriately to avoid interrupting the normal flow of cleanup operations.
*/
protected abstract void closeOrReleaseResources();
@Override
public boolean onSuccess() throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
return false;
}
status = TaskStatus.SUCCESS;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
return false;
}
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
if (null == job) {
log.info("job is null, job id is {}", jobId);
return false;
}
job.onTaskSuccess(this);
return true;
}
/**
* Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources.
* This method encapsulates the core cancellation logic, calling the abstract method
* {@link #executeCancelLogic(boolean)} for task-specific actions.
*
* @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping
* the original exception.
*/
@Override
public boolean cancel(boolean needWaitCancelComplete) throws JobException {
if (TaskStatus.SUCCESS.equals(status) || TaskStatus.FAILED.equals(status) || TaskStatus.CANCELED.equals(
status)) {
return false;
}
try {
status = TaskStatus.CANCELED;
executeCancelLogic(needWaitCancelComplete);
return true;
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e);
throw new JobException(e);
} finally {
closeOrReleaseResources();
}
}
/**
* Abstract method for implementing the task-specific cancellation logic.
* Subclasses must override this method to provide their own implementation of how a task should be canceled.
*
* @throws Exception Any exception that might occur during the cancellation process in the subclass.
*/
protected abstract void executeCancelLogic(boolean needWaitCancelComplete) throws Exception;
public static TUniqueId generateQueryId() {
UUID taskId = UUID.randomUUID();
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
}
@Override
public void before() throws JobException {
status = TaskStatus.RUNNING;
setStartTimeMs(System.currentTimeMillis());
}
public void runTask() throws JobException {
try {
before();
run();
onSuccess();
} catch (Exception e) {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
} finally {
// The cancel logic will call the closeOrReleased Resources method by itself.
// If it is also called here,
// it may result in the inability to obtain relevant information when canceling the task
if (!TaskStatus.CANCELED.equals(status)) {
closeOrReleaseResources();
}
}
}
public boolean isCancelled() {
return status.equals(TaskStatus.CANCELED);
}
public String getJobName() {
AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId);
return job == null ? "" : job.getJobName();
}
public Job getJobOrJobException() throws JobException {
AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId);
if (job == null) {
throw new JobException("job not exist, jobId:" + jobId);
}
return job;
}
@Override
public String toString() {
return "AbstractTask{"
+ "jobId=" + jobId
+ ", taskId=" + taskId
+ ", status=" + status
+ ", createTimeMs=" + createTimeMs
+ ", startTimeMs=" + startTimeMs
+ ", finishTimeMs=" + finishTimeMs
+ ", taskType=" + taskType
+ ", errMsg='" + errMsg + '\''
+ '}';
}
}