RoutineLoadTaskScheduler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
/**
* Routine load task scheduler is a function which allocate task to be.
* Step1: update backend slot if interval more than BACKEND_SLOT_UPDATE_INTERVAL_MS
* Step2: submit beIdToBatchTask when queue is empty
* Step3: take a task from queue and schedule this task
*
* The scheduler will be blocked in step3 till the queue receive a new task
*/
public class RoutineLoadTaskScheduler extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class);
private static final long BACKEND_SLOT_UPDATE_INTERVAL_MS = 10000; // 10s
private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s
private RoutineLoadManager routineLoadManager;
private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue = new LinkedBlockingDeque<>();
private long lastBackendSlotUpdateTime = -1;
@VisibleForTesting
public RoutineLoadTaskScheduler() {
super("Routine load task scheduler", 0);
this.routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager();
}
public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) {
//Set the polling interval to 1ms to avoid meaningless idling when there is no data, resulting in increased CPU.
// The wait/notify mechanism should be used later
super("Routine load task scheduler", 1);
this.routineLoadManager = routineLoadManager;
}
@Override
protected void runAfterCatalogReady() {
try {
process();
} catch (Throwable e) {
LOG.warn("Failed to process one round of RoutineLoadTaskScheduler", e);
}
}
private void process() throws UserException, InterruptedException {
// update the max slot num of each backend periodically
updateBackendSlotIfNecessary();
// if size of queue is zero, tasks will be submitted by batch
int idleSlotNum = routineLoadManager.getClusterIdleSlotNum();
// scheduler will be blocked when there is no slot for task in cluster
if (idleSlotNum == 0) {
Thread.sleep(SLOT_FULL_SLEEP_MS);
return;
}
try {
// This step will be blocked when queue is empty
RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.take();
// try to delay scheduling tasks that are perceived as Eof to MaxBatchInterval
// to avoid to much small transaction
if (routineLoadTaskInfo.getIsEof()) {
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime()
< routineLoadJob.getMaxBatchIntervalS() * 1000) {
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
}
scheduleOneTask(routineLoadTaskInfo);
} catch (Exception e) {
LOG.warn("Taking routine load task from queue has been interrupted", e);
}
}
private void scheduleOneTask(RoutineLoadTaskInfo routineLoadTaskInfo) throws Exception {
routineLoadTaskInfo.setLastScheduledTime(System.currentTimeMillis());
if (LOG.isDebugEnabled()) {
LOG.debug("schedule routine load task info {} for job {}",
routineLoadTaskInfo.id, routineLoadTaskInfo.getJobId());
}
// check if task has been abandoned
if (!routineLoadManager.checkTaskInJob(routineLoadTaskInfo)) {
// task has been abandoned while renew task has been added in queue
// or database has been deleted
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
.add("error_msg", "task has been abandoned when scheduling task")
.build());
return;
}
try {
if (routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).isFinal()) {
return;
}
// check if topic has more data to consume
if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
// allocate BE slot for this task.
// this should be done before txn begin, or the txn may be begun successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next scheduling
needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
return;
}
} catch (UserException e) {
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.PAUSED, new ErrorReason(e.getErrorCode(), e.getMessage()), false);
throw e;
} catch (Exception e) {
// exception happens, PAUSE the job
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
"failed to allocate task: " + e.getMessage()), false);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg",
"allocate task encounter exception: " + e.getMessage()).build(), e);
throw e;
}
// begin txn
try {
if (!routineLoadTaskInfo.beginTxn()) {
// begin txn failed. push it back to the queue to wait next scheduling
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
return;
}
} catch (Exception e) {
// exception happens, PAUSE the job
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
"failed to begin txn: " + e.getMessage()), false);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg",
"begin task txn encounter exception: " + e.getMessage()).build(), e);
throw e;
}
// create thrift object
TRoutineLoadTask tRoutineLoadTask = null;
try {
long startTime = System.currentTimeMillis();
tRoutineLoadTask = routineLoadTaskInfo.createRoutineLoadTask();
if (LOG.isDebugEnabled()) {
LOG.debug("create routine load task cost(ms): {}, job id: {}",
(System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId());
}
} catch (MetaNotFoundException e) {
// this means database or table has been dropped, just stop this routine load job.
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.CANCELLED,
new ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + e.getMessage()),
false);
throw e;
} catch (UserException e) {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.PAUSED,
new ErrorReason(e.getErrorCode(),
"failed to create task: " + e.getMessage()), false);
throw e;
}
try {
long startTime = System.currentTimeMillis();
submitTask(routineLoadTaskInfo.getBeId(), tRoutineLoadTask);
if (LOG.isDebugEnabled()) {
LOG.debug("send routine load task cost(ms): {}, job id: {}",
(System.currentTimeMillis() - startTime), routineLoadTaskInfo.getJobId());
}
if (tRoutineLoadTask.isSetKafkaLoadInfo()) {
if (LOG.isDebugEnabled()) {
LOG.debug("send kafka routine load task {} with partition offset: {}, job: {}",
tRoutineLoadTask.label, tRoutineLoadTask.kafka_load_info.partition_begin_offset,
tRoutineLoadTask.getJobId());
}
}
} catch (LoadException e) {
// submit task failed (such as TOO_MANY_TASKS error), but txn has already begun.
// Here we will still set the ExecuteStartTime of this task, which means
// we "assume" that this task has been successfully submitted.
// And this task will then be aborted because of a timeout.
// In this way, we can prevent the entire job from being paused due to submit errors,
// and we can also relieve the pressure on BE by waiting for the timeout period.
LOG.warn("failed to submit routine load task {} to BE: {}, error: {}",
DebugUtil.printId(routineLoadTaskInfo.getId()),
routineLoadTaskInfo.getBeId(), e.getMessage());
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).setOtherMsg(e.getMessage());
// fall through to set ExecuteStartTime
}
// set the executeStartTimeMs of task
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
}
private void updateBackendSlotIfNecessary() {
long currentTime = System.currentTimeMillis();
if (lastBackendSlotUpdateTime == -1
|| (currentTime - lastBackendSlotUpdateTime > BACKEND_SLOT_UPDATE_INTERVAL_MS)) {
routineLoadManager.updateBeIdToMaxConcurrentTasks();
lastBackendSlotUpdateTime = currentTime;
if (LOG.isDebugEnabled()) {
LOG.debug("update backend max slot for routine load task scheduling. current task num per BE: {}",
Config.max_routine_load_task_num_per_be);
}
}
}
public void addTaskInQueue(RoutineLoadTaskInfo routineLoadTaskInfo) {
needScheduleTasksQueue.add(routineLoadTaskInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("total tasks num in routine load task queue: {}", needScheduleTasksQueue.size());
}
}
public void addTasksInQueue(List<RoutineLoadTaskInfo> routineLoadTaskInfoList) {
needScheduleTasksQueue.addAll(routineLoadTaskInfoList);
if (LOG.isDebugEnabled()) {
LOG.debug("total tasks num in routine load task queue: {}", needScheduleTasksQueue.size());
}
}
private void submitTask(long beId, TRoutineLoadTask tTask) throws LoadException {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
if (backend == null) {
throw new LoadException("failed to send tasks to backend " + beId + " because not exist");
}
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBePort());
boolean ok = false;
BackendService.Client client = null;
try {
client = ClientPool.backendPool.borrowObject(address);
TStatus tStatus = client.submitRoutineLoadTask(Lists.newArrayList(tTask));
ok = true;
if (tStatus.getStatusCode() != TStatusCode.OK) {
throw new LoadException("failed to submit task. error code: " + tStatus.getStatusCode()
+ ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? tStatus.getErrorMsgs().get(0) : "NaN"));
}
if (LOG.isDebugEnabled()) {
LOG.debug("send routine load task {} to BE: {}", DebugUtil.printId(tTask.id), beId);
}
} catch (Exception e) {
throw new LoadException("failed to send task: " + e.getMessage(), e);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}
// try to allocate a task to BE which has idle slot.
// 1. First is to check if the previous allocated BE has more than half of available slots.
// If yes, allocate task to previous BE.
// 2. If not, try to find a better one with most idle slots.
// return true if allocate successfully. return false if failed.
// throw exception if unrecoverable errors happen.
private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
routineLoadTaskInfo.getPreviousBeId());
if (beId == -1L) {
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
.add("job_id", routineLoadTaskInfo.getJobId())
.add("previous_be_id", routineLoadTaskInfo.getPreviousBeId())
.add("assigned_be_id", beId)
.build());
}
routineLoadTaskInfo.setBeId(beId);
return true;
}
}