CreateJobInfo.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands.info;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Strings;

import java.util.Map;
import java.util.Optional;

/**
 * Build job info and analyze the SQL statement to create a job.
 */
public class CreateJobInfo {

    // exclude job name prefix, which is used by inner job
    private static final String excludeJobNamePrefix = "inner_";

    private final Optional<String> labelNameOptional;

    private final Optional<String> onceJobStartTimestampOptional;

    private final Optional<Long> intervalOptional;

    private final Optional<String> intervalTimeUnitOptional;

    private final Optional<String> startsTimeStampOptional;

    private final Optional<String> endsTimeStampOptional;

    private final Optional<Boolean> immediateStartOptional;

    private final String comment;

    private final String executeSql;
    private final boolean streamingJob;
    private final Map<String, String> jobProperties;

    /**
     * Constructor for CreateJobInfo.
     *
     * @param labelNameOptional             Job name.
     * @param onceJobStartTimestampOptional Start time for a one-time job.
     * @param intervalOptional              Interval for a recurring job.
     * @param intervalTimeUnitOptional      Interval time unit for a recurring job.
     * @param startsTimeStampOptional       Start time for a recurring job.
     * @param endsTimeStampOptional         End time for a recurring job.
     * @param immediateStartOptional        Immediate start for a job.
     * @param comment                       Comment for the job.
     * @param executeSql                    Original SQL statement.
     */
    public CreateJobInfo(Optional<String> labelNameOptional, Optional<String> onceJobStartTimestampOptional,
                         Optional<Long> intervalOptional, Optional<String> intervalTimeUnitOptional,
                         Optional<String> startsTimeStampOptional, Optional<String> endsTimeStampOptional,
                         Optional<Boolean> immediateStartOptional, String comment, String executeSql,
                         boolean streamingJob, Map<String, String> jobProperties) {
        this.labelNameOptional = labelNameOptional;
        this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
        this.intervalOptional = intervalOptional;
        this.intervalTimeUnitOptional = intervalTimeUnitOptional;
        this.startsTimeStampOptional = startsTimeStampOptional;
        this.endsTimeStampOptional = endsTimeStampOptional;
        this.immediateStartOptional = immediateStartOptional;
        this.comment = comment;
        this.executeSql = executeSql;
        this.streamingJob = streamingJob;
        this.jobProperties = jobProperties;
    }

    /**
     * Analyzes the provided SQL statement and builds the job information.
     *
     * @param ctx Connect context.
     * @return AbstractJob instance.
     * @throws UserException If there is an error during SQL analysis or job creation.
     */
    public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws UserException {
        checkAuth();
        if (labelNameOptional.orElseThrow(() -> new AnalysisException("labelName is null")).isEmpty()) {
            throw new AnalysisException("Job name can not be empty");
        }

        String jobName = labelNameOptional.get();
        checkJobName(jobName);
        String dbName = ctx.getDatabase();

        Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
        // check its insert stmt,currently only support insert stmt
        JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
        JobExecuteType executeType = intervalOptional.isPresent() ? JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
        if (streamingJob) {
            executeType = JobExecuteType.STREAMING;
            jobExecutionConfiguration.setImmediate(true);
        }
        jobExecutionConfiguration.setExecuteType(executeType);

        TimerDefinition timerDefinition = new TimerDefinition();
        if (executeType.equals(JobExecuteType.ONE_TIME)) {
            buildOnceJob(timerDefinition, jobExecutionConfiguration);
        } else if (executeType.equals(JobExecuteType.STREAMING)) {
            buildStreamingJob(timerDefinition);
        } else {
            buildRecurringJob(timerDefinition, jobExecutionConfiguration);
        }
        jobExecutionConfiguration.setTimerDefinition(timerDefinition);
        return analyzeAndCreateJob(executeSql, dbName, jobExecutionConfiguration, jobProperties);
    }

    private void buildStreamingJob(TimerDefinition timerDefinition) {
        // timerDefinition.setInterval(properties.getMaxIntervalSecond());
        timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
        timerDefinition.setStartTimeMs(System.currentTimeMillis());
    }

    /**
     * Builds a TimerDefinition for a once-job.
     *
     * @param timerDefinition           Timer definition to be built.
     * @param jobExecutionConfiguration Job execution configuration.
     * @throws AnalysisException If the job is not configured correctly.
     */
    private void buildOnceJob(TimerDefinition timerDefinition,
                              JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException {
        if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) {
            jobExecutionConfiguration.setImmediate(true);
            timerDefinition.setStartTimeMs(System.currentTimeMillis());
            return;
        }

        // Ensure start time is provided for once jobs.
        String startTime = onceJobStartTimestampOptional.orElseThrow(()
                -> new AnalysisException("Once time job must set start time"));
        timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(startTime));
    }

    /**
     * Builds a TimerDefinition for a recurring job.
     *
     * @param timerDefinition           Timer definition to be built.
     * @param jobExecutionConfiguration Job execution configuration.
     * @throws AnalysisException If the job is not configured correctly.
     */
    private void buildRecurringJob(TimerDefinition timerDefinition,
                                   JobExecutionConfiguration jobExecutionConfiguration) throws AnalysisException {
        // Ensure interval is provided for recurring jobs.
        long interval = intervalOptional.orElseThrow(()
                -> new AnalysisException("Interval must be set for recurring job"));
        timerDefinition.setInterval(interval);

        // Ensure interval time unit is provided for recurring jobs.
        String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(()
                -> new AnalysisException("Interval time unit must be set for recurring job"));
        IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
        if (intervalUnit == null) {
            throw new AnalysisException("Invalid interval time unit: " + intervalTimeUnit);
        }

        // Check if interval unit is second and disable if not in test mode.
        if (intervalUnit.equals(IntervalUnit.SECOND) && !Config.enable_job_schedule_second_for_test) {
            throw new AnalysisException("Interval time unit can not be second in production mode");
        }

        timerDefinition.setIntervalUnit(intervalUnit);

        // Set end time if provided.
        endsTimeStampOptional.ifPresent(s -> timerDefinition.setEndTimeMs(stripQuotesAndParseTimestamp(s)));

        // Set immediate start if configured.
        if (immediateStartOptional.isPresent() && Boolean.TRUE.equals(immediateStartOptional.get())) {
            jobExecutionConfiguration.setImmediate(true);
            // Avoid immediate re-scheduling by setting start time slightly in the past.
            timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100);
            return;
        }
        // Set start time if provided.
        startsTimeStampOptional.ifPresent(s -> timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s)));
    }

    protected void checkAuth() throws AnalysisException {
        if (streamingJob) {
            StreamingInsertJob.checkPrivilege(ConnectContext.get(), executeSql);
            return;
        }

        if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
        }
    }

    /**
     * Analyzes the provided SQL statement and creates an appropriate job based on the parsed logical plan.
     * Currently, only "InsertIntoTableCommand" is supported for job creation.
     *
     * @param sql                       the SQL statement to be analyzed
     * @param currentDbName             the current database name where the SQL statement will be executed
     * @param jobExecutionConfiguration the configuration for job execution
     * @return an instance of AbstractJob corresponding to the SQL statement
     * @throws UserException if there is an error during SQL analysis or job creation
     */
    private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
                                            JobExecutionConfiguration jobExecutionConfiguration,
                                            Map<String, String> properties) throws UserException {
        if (jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) {
            return analyzeAndCreateStreamingInsertJob(sql, currentDbName, jobExecutionConfiguration, properties);
        } else {
            return analyzeAndCreateInsertJob(sql, currentDbName, jobExecutionConfiguration);
        }
    }

    private AbstractJob analyzeAndCreateInsertJob(String sql, String currentDbName,
            JobExecutionConfiguration jobExecutionConfiguration) throws UserException {
        NereidsParser parser = new NereidsParser();
        LogicalPlan logicalPlan = parser.parseSingle(sql);
        if (logicalPlan instanceof InsertIntoTableCommand) {
            InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan;
            try {
                insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false);
                return new InsertJob(labelNameOptional.get(),
                        JobStatus.RUNNING,
                        currentDbName,
                        comment,
                        ConnectContext.get().getCurrentUserIdentity(),
                        jobExecutionConfiguration,
                        System.currentTimeMillis(),
                        sql);
            } catch (Exception e) {
                throw new AnalysisException(e.getMessage());
            }
        } else {
            throw new AnalysisException("Not support this sql : " + sql + " Command class is "
                    + logicalPlan.getClass().getName() + ".");
        }
    }

    private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String currentDbName,
            JobExecutionConfiguration jobExecutionConfiguration, Map<String, String> properties) throws UserException {
        NereidsParser parser = new NereidsParser();
        LogicalPlan logicalPlan = parser.parseSingle(sql);
        if (logicalPlan instanceof InsertIntoTableCommand) {
            InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan;
            try {
                insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false);
                return new StreamingInsertJob(labelNameOptional.get(),
                        JobStatus.PENDING,
                        currentDbName,
                        comment,
                        ConnectContext.get().getCurrentUserIdentity(),
                        jobExecutionConfiguration,
                        System.currentTimeMillis(),
                        sql,
                        properties);
            } catch (Exception e) {
                throw new AnalysisException(e.getMessage());
            }
        } else {
            throw new AnalysisException("Only " + logicalPlan.getClass().getName()
                    + " is supported to use with streaming job together");
        }
    }

    private void checkJobName(String jobName) throws AnalysisException {
        if (Strings.isNullOrEmpty(jobName)) {
            throw new AnalysisException("job name can not be null");
        }
        if (jobName.startsWith(excludeJobNamePrefix)) {
            throw new AnalysisException("job name can not start with " + excludeJobNamePrefix);
        }
    }

    /**
     * Strips quotes from the input string and parses it to a timestamp.
     *
     * @param str The input string potentially enclosed in single or double quotes.
     * @return The parsed timestamp as a long value, or -1L if the input is null or empty.
     */
    public static Long stripQuotesAndParseTimestamp(String str) {
        if (str == null || str.isEmpty()) {
            return -1L;
        }
        if (str.startsWith("'") && str.endsWith("'")) {
            str = str.substring(1, str.length() - 1);
        } else if (str.startsWith("\"") && str.endsWith("\"")) {
            str = str.substring(1, str.length() - 1);
        }
        return TimeUtils.timeStringToLong(str.trim());
    }

    public boolean streamingJob() {
        return streamingJob;
    }
}