StreamingJobSchedulerTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;

public class StreamingJobSchedulerTask extends AbstractTask {
    private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
    private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
    private StreamingInsertJob streamingInsertJob;

    public StreamingJobSchedulerTask(StreamingInsertJob streamingInsertJob) {
        this.streamingInsertJob = streamingInsertJob;
    }

    @Override
    public void run() throws JobException {
        switch (streamingInsertJob.getJobStatus()) {
            case PENDING:
                streamingInsertJob.createStreamingInsertTask();
                streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
                streamingInsertJob.setAutoResumeCount(0);
                break;
            case RUNNING:
                streamingInsertJob.fetchMeta();
                break;
            case PAUSED:
                autoResumeHandler();
                break;
            default:
                break;
        }
    }

    private void autoResumeHandler() throws JobException {
        final FailureReason failureReason = streamingInsertJob.getFailureReason();
        final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp();
        final long autoResumeCount = streamingInsertJob.getAutoResumeCount();
        final long current = System.currentTimeMillis();

        if (failureReason != null
                && failureReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR
                && failureReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
                && failureReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) {
            long autoResumeIntervalTimeSec = autoResumeCount < 5
                        ? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
                                MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
            if (current - latestAutoResumeTimestamp > autoResumeIntervalTimeSec * 1000L) {
                streamingInsertJob.setLatestAutoResumeTimestamp(current);
                if (autoResumeCount < Long.MAX_VALUE) {
                    streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
                }
                streamingInsertJob.updateJobStatus(JobStatus.PENDING);
                return;
            }
        }
    }

    @Override
    protected void closeOrReleaseResources() {
        // do nothing
    }

    @Override
    protected void executeCancelLogic(boolean needWaitCancelComplete) throws Exception {
        if (streamingInsertJob.getRunningStreamTask() != null) {
            streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
        }
    }

    @Override
    public TRow getTvfInfo(String jobName) {
        StreamingInsertTask runningTask = streamingInsertJob.getRunningStreamTask();
        if (runningTask == null) {
            return null;
        }
        TRow trow = new TRow();
        trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
        trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getJobId())));
        trow.addToColumnValue(new TCell().setStringVal(jobName));
        trow.addToColumnValue(new TCell().setStringVal(runningTask.getLabelName()));
        trow.addToColumnValue(new TCell().setStringVal(runningTask.getStatus().name()));
        // err msg
        String errMsg = "";
        if (StringUtils.isNotBlank(runningTask.getErrMsg())
                && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
            errMsg = runningTask.getErrMsg();
        } else {
            errMsg = runningTask.getOtherMsg();
        }
        trow.addToColumnValue(new TCell().setStringVal(StringUtils.isNotBlank(errMsg)
                ? errMsg : FeConstants.null_string));

        // create time
        trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
        trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? FeConstants.null_string
                : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
        // load end time
        trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));

        List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
                .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
        if (!loadJobs.isEmpty()) {
            LoadJob loadJob = loadJobs.get(0);
            if (loadJob.getLoadingStatus() != null && loadJob.getLoadingStatus().getTrackingUrl() != null) {
                trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
            } else {
                trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
            }

            if (loadJob.getLoadStatistic() != null) {
                trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
            } else {
                trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
            }
        } else {
            trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
            trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
        }

        if (runningTask.getUserIdentity() == null) {
            trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
        } else {
            trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
        }
        trow.addToColumnValue(new TCell().setStringVal(""));
        return trow;
    }
}