JobExecutionConfiguration.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.base;
import org.apache.doris.common.util.TimeUtils;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
public class JobExecutionConfiguration {
@Getter
@Setter
@SerializedName(value = "td")
private TimerDefinition timerDefinition;
@Getter
@Setter
@SerializedName(value = "ec")
private JobExecuteType executeType;
@Getter
@Setter
private boolean immediate = false;
/**
* Maximum number of concurrent tasks, <= 0 means no limit
* if the number of tasks exceeds the limit, the task will be delayed execution
* todo: implement this later, we need to consider concurrency strategies
*/
private Integer maxConcurrentTaskNum;
public void initParams() {
initTimerDefinition();
}
public void checkParams() {
if (executeType == null) {
throw new IllegalArgumentException("executeType cannot be null");
}
if (executeType == JobExecuteType.INSTANT || executeType == JobExecuteType.MANUAL) {
return;
}
checkTimerDefinition();
if (executeType == JobExecuteType.ONE_TIME) {
validateStartTimeMs();
return;
}
if (executeType == JobExecuteType.STREAMING) {
validateStartTimeMs();
return;
}
if (executeType == JobExecuteType.RECURRING) {
if (timerDefinition.getInterval() == null) {
throw new IllegalArgumentException("interval cannot be null when executeType is RECURRING");
}
if (timerDefinition.getIntervalUnit() == null) {
throw new IllegalArgumentException("intervalUnit cannot be null when executeType is RECURRING");
}
}
}
private void checkTimerDefinition() {
if (timerDefinition == null) {
throw new IllegalArgumentException(
"timerDefinition cannot be null when executeType is not instant or manual");
}
timerDefinition.checkParams();
}
private void initTimerDefinition() {
if (timerDefinition != null) {
timerDefinition.initParams();
}
}
private void validateStartTimeMs() {
if (timerDefinition.getStartTimeMs() == null) {
throw new IllegalArgumentException("startTimeMs cannot be null");
}
if (isImmediate()) {
return;
}
if (timerDefinition.getStartTimeMs() < System.currentTimeMillis()) {
throw new IllegalArgumentException("startTimeMs must be greater than current time");
}
}
// Returns a list of delay times in seconds for triggering the job
public List<Long> getTriggerDelayTimes(Long currentTimeMs, Long startTimeMs, Long endTimeMs) {
List<Long> delayTimeSeconds = new ArrayList<>();
if (JobExecuteType.ONE_TIME.equals(executeType)) {
// If the job is already executed or in the schedule queue, or not within this schedule window
if (null != timerDefinition.getLatestSchedulerTimeMs() || endTimeMs < timerDefinition.getStartTimeMs()) {
return delayTimeSeconds;
}
delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, timerDefinition.getStartTimeMs()));
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
return delayTimeSeconds;
}
if (JobExecuteType.STREAMING.equals(executeType) && null != timerDefinition) {
if (null == timerDefinition.getStartTimeMs() || null != timerDefinition.getLatestSchedulerTimeMs()) {
return delayTimeSeconds;
}
// If the job is already executed or in the schedule queue, or not within this schedule window
if (endTimeMs < timerDefinition.getStartTimeMs()) {
return delayTimeSeconds;
}
delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, timerDefinition.getStartTimeMs()));
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
return delayTimeSeconds;
}
if (JobExecuteType.RECURRING.equals(executeType)) {
if (timerDefinition.getStartTimeMs() > endTimeMs || null != timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < startTimeMs) {
return delayTimeSeconds;
}
long intervalValue = timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval());
long jobStartTimeMs = timerDefinition.getStartTimeMs();
return getExecutionDelaySeconds(startTimeMs, endTimeMs, jobStartTimeMs,
intervalValue, currentTimeMs);
}
return delayTimeSeconds;
}
// Returns the delay time in seconds between the current time and the specified start time
private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) {
if (startTimeMs <= currentTimeMs) {
return 0L;
}
return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
}
// Returns a list of delay times in seconds for executing the job within the specified window
private List<Long> getExecutionDelaySeconds(long windowStartTimeMs, long windowEndTimeMs, long startTimeMs,
long intervalMs, long currentTimeMs) {
List<Long> timestamps = new ArrayList<>();
long windowDuration = windowEndTimeMs - windowStartTimeMs;
if (windowDuration <= 0 || intervalMs <= 0) {
return timestamps; // Return an empty list if there won't be any trigger time
}
long firstTriggerTime = windowStartTimeMs + (intervalMs - ((windowStartTimeMs - startTimeMs)
% intervalMs)) % intervalMs;
// should filter result which smaller than start time
if (firstTriggerTime < startTimeMs) {
firstTriggerTime = startTimeMs;
}
if (firstTriggerTime < currentTimeMs) {
// Calculate how many intervals to add to get the largest trigger time < currentTimeMs
long intervalsToAdd = (currentTimeMs - firstTriggerTime) / intervalMs;
firstTriggerTime += intervalsToAdd * intervalMs;
}
if (firstTriggerTime > windowEndTimeMs) {
return timestamps; // Return an empty list if there won't be any trigger time
}
// Calculate the trigger time list
for (long triggerTime = firstTriggerTime; triggerTime < windowEndTimeMs; triggerTime += intervalMs) {
if (null == timerDefinition.getEndTimeMs()
|| triggerTime < timerDefinition.getEndTimeMs()) {
timerDefinition.setLatestSchedulerTimeMs(triggerTime);
timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime));
}
}
return timestamps;
}
public String convertRecurringStrategyToString() {
switch (executeType) {
case ONE_TIME:
return "AT " + TimeUtils.longToTimeString(timerDefinition.getStartTimeMs());
case RECURRING:
String result = "EVERY " + timerDefinition.getInterval() + " "
+ timerDefinition.getIntervalUnit().name() + " STARTS "
+ TimeUtils.longToTimeString(timerDefinition.getStartTimeMs());
if (null != timerDefinition.getEndTimeMs()) {
result += " ENDS " + TimeUtils.longToTimeString(timerDefinition.getEndTimeMs());
}
return result;
/* case STREAMING:
return "STREAMING" + (startTimeMs > 0 ? " AT " + TimeUtils.longToTimeString(startTimeMs) : "");*/
case MANUAL:
return "MANUAL TRIGGER";
case INSTANT:
return "INSTANT";
default:
return "UNKNOWN";
}
}
public boolean checkIsTimerJob() {
return null != timerDefinition;
}
}