CreateJobStmt.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* syntax:
* CREATE
* [DEFINER = user]
* JOB
* event_name
* ON SCHEDULE schedule
* [COMMENT 'string']
* DO event_body;
* schedule: {
* [STREAMING] AT timestamp
* | EVERY interval
* [STARTS timestamp ]
* [ENDS timestamp ]
* }
* interval:
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*/
@Deprecated
@Slf4j
public class CreateJobStmt extends DdlStmt implements NotFallbackInParser {
@Getter
private StatementBase doStmt;
@Getter
private AbstractJob jobInstance;
private final LabelName labelName;
private final String onceJobStartTimestamp;
private final Long interval;
private final String intervalTimeUnit;
private final String startsTimeStamp;
private final String endsTimeStamp;
private final String comment;
private String jobName;
public static final String CURRENT_TIMESTAMP_STRING = "current_timestamp";
private JobExecuteType executeType;
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
this.labelName = labelName;
this.onceJobStartTimestamp = onceJobStartTimestamp;
this.interval = interval;
this.intervalTimeUnit = intervalTimeUnit;
this.startsTimeStamp = startsTimeStamp;
this.endsTimeStamp = endsTimeStamp;
this.comment = comment;
this.doStmt = doStmt;
this.executeType = executeType;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
TimerDefinition timerDefinition = new TimerDefinition();
if (null != onceJobStartTimestamp) {
if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
jobExecutionConfiguration.setImmediate(true);
timerDefinition.setStartTimeMs(System.currentTimeMillis());
} else {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
}
}
if (null != interval) {
timerDefinition.setInterval(interval);
}
if (null != intervalTimeUnit) {
IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
if (null == intervalUnit) {
throw new AnalysisException("interval time unit can not be " + intervalTimeUnit);
}
if (intervalUnit.equals(IntervalUnit.SECOND)
&& !Config.enable_job_schedule_second_for_test) {
throw new AnalysisException("interval time unit can not be second");
}
timerDefinition.setIntervalUnit(intervalUnit);
}
if (null != startsTimeStamp) {
if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) {
jobExecutionConfiguration.setImmediate(true);
//To avoid immediate re-scheduling, set the start time of the timer 100ms before the current time.
timerDefinition.setStartTimeMs(System.currentTimeMillis());
} else {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
}
}
if (null != endsTimeStamp) {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
checkJobName(labelName.getLabelName());
this.jobName = labelName.getLabelName();
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt, jobName, comment);
analyzerSqlStmt(executeSql);
// create job use label name as its job name
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
labelName.getDbName(),
comment,
ConnectContext.get().getCurrentUserIdentity(),
jobExecutionConfiguration,
System.currentTimeMillis(),
executeSql);
jobInstance = job;
}
private void checkJobName(String jobName) throws AnalysisException {
if (StringUtils.isBlank(jobName)) {
throw new AnalysisException("job name can not be null");
}
if (jobName.startsWith(excludeJobNamePrefix)) {
throw new AnalysisException("job name can not start with " + excludeJobNamePrefix);
}
}
protected static void checkAuth() throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
private void analyzerSqlStmt(String sql) throws UserException {
NereidsParser parser = new NereidsParser();
LogicalPlan logicalPlan = parser.parseSingle(sql);
if (logicalPlan instanceof InsertIntoTableCommand) {
InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan;
try {
insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor());
} catch (Exception e) {
throw new AnalysisException(e.getMessage());
}
} else {
throw new AnalysisException("Not support this sql : " + sql);
}
}
/**
* parse execute sql from create job stmt
* Some stmt not implement toSql method,so we need to parse sql from originStmt
*/
private static String parseExecuteSql(String sql, String jobName, String comment) throws AnalysisException {
String lowerCaseSql = sql.toLowerCase();
String lowerCaseJobName = jobName.toLowerCase();
// Find the end position of the job name in the SQL statement.
int jobNameEndIndex = lowerCaseSql.indexOf(lowerCaseJobName) + lowerCaseJobName.length();
String subSqlStmt = lowerCaseSql.substring(jobNameEndIndex);
String originSubSqlStmt = sql.substring(jobNameEndIndex);
// If the comment is not empty, extract the SQL statement from the end position of the comment.
if (StringUtils.isNotBlank(comment)) {
String lowerCaseComment = comment.toLowerCase();
int splitDoIndex = subSqlStmt.indexOf(lowerCaseComment) + lowerCaseComment.length();
subSqlStmt = subSqlStmt.substring(splitDoIndex);
originSubSqlStmt = originSubSqlStmt.substring(splitDoIndex);
}
// Find the position of the "do" keyword and extract the execution SQL statement from this position.
int executeSqlIndex = subSqlStmt.indexOf("do");
String executeSql = originSubSqlStmt.substring(executeSqlIndex + 2).trim();
if (StringUtils.isBlank(executeSql)) {
throw new AnalysisException("execute sql has invalid format");
}
return executeSql;
}
protected static boolean isInnerJob(String jobName) {
return jobName.startsWith(excludeJobNamePrefix);
}
}