ScheduleRule.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
 * ScheduleRule: RoutineLoad PAUSED -> NEED_SCHEDULE
 */
public class ScheduleRule {
    private static final Logger LOG = LogManager.getLogger(ScheduleRule.class);
    private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
    private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
    private static int deadBeCount() {
        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
        int total = systemInfoService.getAllBackendIds(false).size();
        int alive = systemInfoService.getAllBackendIds(true).size();
        return total - alive;
    }
    /**
     * check if RoutineLoadJob is auto schedule
     * @param jobRoutine
     * @return
     */
    public static boolean isNeedAutoSchedule(RoutineLoadJob jobRoutine) {
        if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) {
            return false;
        }
        /*
         * Handle all backends are down.
         */
        if (jobRoutine.pauseReason != null
                && jobRoutine.pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR
                && jobRoutine.pauseReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
                && jobRoutine.pauseReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) {
            int dead = deadBeCount();
            if (dead > Config.max_tolerable_backend_down_num) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("dead backend num {} is larger than config {}, "
                                    + "routine load job {} can not be auto rescheduled",
                            dead, Config.max_tolerable_backend_down_num, jobRoutine.id);
                }
                return false;
            }
            if (jobRoutine.latestResumeTimestamp == 0) { //the first resume
                jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
                jobRoutine.autoResumeCount = 1;
                return true;
            } else {
                long current = System.currentTimeMillis();
                if (current - jobRoutine.latestResumeTimestamp < Config.period_of_auto_resume_min * 60000L) {
                    long autoResumeIntervalTimeSec = calAutoResumeInterval(jobRoutine);
                    if (current - jobRoutine.latestResumeTimestamp > autoResumeIntervalTimeSec * 1000L) {
                        LOG.info("try to auto reschedule routine load {}, latestResumeTimestamp: {},"
                                + "  autoResumeCount: {}, pause reason: {}",
                                jobRoutine.id, jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
                                jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name());
                        jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
                        if (jobRoutine.autoResumeCount < Long.MAX_VALUE) {
                            jobRoutine.autoResumeCount++;
                        }
                        return true;
                    }
                } else {
                    /**
                     * for example:
                     *       the first resume time at 10:01
                     *       the second resume time at 10:03
                     *       the third resume time at 10:20
                     *           --> we must be reset counter because a new period for AutoResume RoutineLoadJob
                     */
                    jobRoutine.latestResumeTimestamp = current;
                    jobRoutine.autoResumeCount = 1;
                    return true;
                }
            }
        }
        return false;
    }
    public static long calAutoResumeInterval(RoutineLoadJob jobRoutine) {
        return jobRoutine.autoResumeCount < 5
                    ? Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
                            MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
    }
}