LoadTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.FailMsg;
import org.apache.doris.task.MasterTask;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Comparator;


public abstract class LoadTask extends MasterTask {

    public enum MergeType {
        MERGE,
        APPEND,
        DELETE
    }

    public enum TaskType {
        PENDING,
        LOADING
    }

    public enum Priority {
        HIGH(0),
        NORMAL(1),
        LOW(2);

        Priority(int value) {
            this.value = value;
        }

        private final int value;

        public int getValue() {
            return value;
        }
    }

    private static final Logger LOG = LogManager.getLogger(LoadTask.class);
    public static final Comparator<LoadTask> COMPARATOR = Comparator.comparing(LoadTask::getPriorityValue)
                .thenComparingLong(LoadTask::getSignature);

    protected TaskType taskType;
    protected LoadTaskCallback callback;
    protected TaskAttachment attachment;
    protected FailMsg failMsg = new FailMsg();
    protected int retryTime = 1;
    private volatile boolean done = false;
    protected long startTimeMs = 0;
    protected final Priority priority;

    public LoadTask(LoadTaskCallback callback, TaskType taskType, Priority priority) {
        this.taskType = taskType;
        this.signature = Env.getCurrentEnv().getNextId();
        this.callback = callback;
        this.priority = priority;
    }

    @Override
    protected void exec() {
        boolean isFinished = false;
        try {
            if (Config.isCloudMode()) {
                while (startTimeMs > System.currentTimeMillis()) {
                    try {
                        Thread.sleep(1000);
                        LOG.info("LoadTask:{} backoff startTimeMs:{} now:{}",
                                signature, startTimeMs, System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        LOG.info("ignore InterruptedException: ", e);
                    }
                }
            }
            // execute pending task
            executeTask();
            // callback on pending task finished
            callback.onTaskFinished(attachment);
            isFinished = true;
        } catch (UserException e) {
            failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
                    .add("error_msg", "Failed to execute load task").build(), e);
        } catch (Throwable t) {
            failMsg.setMsg(t.getMessage() == null ? "" : t.getMessage());
            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
                    .add("error_msg", "Unexpected failed to execute load task").build(), t);
        } finally {
            if (!isFinished) {
                // callback on pending task failed
                callback.onTaskFailed(signature, failMsg);
            }
            done = true;
        }
    }

    /**
     * init load task
     * @throws LoadException
     */
    public void init() throws LoadException {
    }

    /**
     * execute load task
     *
     * @throws UserException task is failed
     */
    abstract void executeTask() throws Exception;

    public int getRetryTime() {
        return retryTime;
    }

    // Derived class may need to override this.
    public void updateRetryInfo() {
        this.retryTime--;
        this.signature = Env.getCurrentEnv().getNextId();
    }

    public TaskType getTaskType() {
        return taskType;
    }

    public boolean isDone() {
        return done;
    }

    public void setStartTimeMs(long startTimeMs) {
        this.startTimeMs = startTimeMs;
    }

    public int getPriorityValue() {
        return this.priority.value;
    }

}