JobScheduler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.scheduler;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.executor.TimerJobSchedulerTask;
import org.apache.doris.job.manager.TaskDisruptorGroupManager;
import org.apache.doris.job.task.AbstractTask;
import io.netty.util.HashedWheelTimer;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Log4j2
public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
/**
* scheduler tasks, it's used to scheduler job
*/
private HashedWheelTimer timerTaskScheduler;
private TaskDisruptor timerJobDisruptor;
private TaskDisruptorGroupManager taskDisruptorGroupManager;
private long latestBatchSchedulerTimerTaskTimeMs = 0L;
private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
private static final int HASHED_WHEEL_TIMER_TICKS_PER_WHEEL = 660;
private final Map<Long, T> jobMap;
public JobScheduler(Map<Long, T> jobMap) {
this.jobMap = jobMap;
}
/**
* batch scheduler interval ms time
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
/**
* Finished job will be cleared after 24 hours
*/
private static final long FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS =
(Config.finished_job_cleanup_threshold_time_hour > 0
? Config.finished_job_cleanup_threshold_time_hour : 24) * 3600 * 1000L;
public void start() {
timerTaskScheduler = new HashedWheelTimer(new CustomThreadFactory("timer-task-scheduler"), 1,
TimeUnit.SECONDS, HASHED_WHEEL_TIMER_TICKS_PER_WHEEL);
timerTaskScheduler.start();
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
/**
* We will cycle system scheduler tasks every 10 minutes.
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler timer tasks, time is " + TimeUtils
.longToTimeStringWithms(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
private void batchSchedulerTimerJob() {
executeTimerJobIdsWithinLastTenMinutesWindow();
}
public void scheduleOneJob(T job) throws JobException {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
return;
}
// not-schedule task
if (!job.getJobConfig().checkIsTimerJob()) {
//manual job will not scheduler
if (JobExecuteType.MANUAL.equals(job.getJobConfig().getExecuteType())) {
if (job.getJobConfig().isImmediate()) {
schedulerInstantJob(job, TaskType.MANUAL, null);
}
return;
}
//todo skip streaming job,improve in the future
if (JobExecuteType.INSTANT.equals(job.getJobConfig().getExecuteType())) {
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
}
// one-time task
if (job.getJobConfig().isImmediate() && JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) {
schedulerInstantJob(job, TaskType.SCHEDULED, null);
return;
}
// when reach here, it has time schedule rules. it's RECURRING job and immediate
// is true
// FIXME: why not check RECURRING?
if (job.getJobConfig().isImmediate()) {
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
//if it's timer job and trigger last window already start, we will scheduler it immediately
cycleTimerJobScheduler(job, System.currentTimeMillis());
}
public void cycleTimerJobScheduler(T job) {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
return;
}
if (!JobExecuteType.RECURRING.equals(job.getJobConfig().getExecuteType())) {
return;
}
//if it's timer job and trigger last window already start, we will scheduler it immediately
cycleTimerJobScheduler(job, System.currentTimeMillis());
}
@Override
public void close() throws IOException {
//todo implement this later
}
private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
startTimeWindowMs = TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isEmpty(delaySeconds)) {
log.info("skip job {} scheduler timer job, delay seconds is empty", job.getJobName());
return;
}
log.info("job {} scheduler timer job, delay seconds size is {}", job.getJobName(), delaySeconds.size());
if (CollectionUtils.isNotEmpty(delaySeconds)) {
delaySeconds.forEach(delaySecond -> {
TimerJobSchedulerTask<T> timerJobSchedulerTask = new TimerJobSchedulerTask<>(timerJobDisruptor, job);
timerTaskScheduler.newTimeout(timerJobSchedulerTask, delaySecond, TimeUnit.SECONDS);
});
}
}
public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobException {
// if context is null, maybe no tasks generated
List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, context);
if (CollectionUtils.isEmpty(tasks)) {
log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(),
job.getJobName());
if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);
}
return;
}
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig())) {
String errorMsg = job.formatMsgWhenExecuteQueueFull(task.getTaskId());
task.onFail(errorMsg);
throw new JobException(errorMsg);
}
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),
job.getJobName(), task.getTaskId());
}
}
/**
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeTimerJobIdsWithinLastTenMinutesWindow() {
long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
long currentTimeMs = TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
log.info("execute timer job ids within last ten minutes window, last time window is {}",
TimeUtils.longToTimeString(lastTimeWindowMs));
if (jobMap.isEmpty()) {
return;
}
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
if (job.getJobStatus().equals(JobStatus.FINISHED) || job.getJobStatus().equals(JobStatus.STOPPED)) {
clearEndJob(job);
continue;
}
if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) {
cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}
}
private void clearEndJob(T job) {
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS > System.currentTimeMillis()) {
return;
}
try {
Env.getCurrentEnv().getJobManager().unregisterJob(job.getJobId());
log.info("clear finish job, job id is {}, job name is {}", job.getJobId(), job.getJobName());
} catch (JobException e) {
log.error("clear finish job error, job id is {}", job.getJobId(), e);
}
}
}