TaskDisruptor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.exception.JobException;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.log4j.Log4j2;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
/**
* This class represents a disruptor for processing event tasks consumed by a Disruptor.
*
* <p>The work handler retrieves the associated event job and executes it if it is running.
* If the event job is not running, the work handler logs an error message. If the event job execution fails,
* the work handler logs an error message and pauses the event job.
*
* <p>The work handler also handles system events by scheduling batch scheduler tasks.
*/
@Log4j2
public class TaskDisruptor implements Closeable {
private Disruptor<TaskEvent> disruptor;
private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size;
private static final int consumerThreadCount = Config.async_task_consumer_thread_num;
/**
* The default timeout for {@link #close()} in seconds.
*/
private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;
/**
* Whether this disruptor has been closed.
* if true, then we can't publish any more events.
*/
private boolean isClosed = false;
/**
* The default {@link EventTranslatorThreeArg} to use for {@link #tryPublish(Long, Long)}.
* This is used to avoid creating a new object for each publish.
*/
private static final EventTranslatorThreeArg<TaskEvent, Long, Long, TaskType> TRANSLATOR
= (event, sequence, jobId, taskId, taskType) -> {
event.setId(jobId);
event.setTaskId(taskId);
event.setTaskType(taskType);
};
public void start() {
CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer");
disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory,
ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS));
WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount];
for (int i = 0; i < consumerThreadCount; i++) {
workers[i] = new TaskHandler();
}
disruptor.handleEventsWithWorkerPool(workers);
disruptor.start();
}
/**
* Publishes a job to the disruptor.
* Default task type is {@link TaskType#SCHEDULER_JOB_TASK}
*
* @param jobId job id
*/
public void tryPublish(Long jobId, Long taskId) {
this.tryPublish(jobId, taskId, TaskType.SCHEDULER_JOB_TASK);
}
/**
* Publishes a job task to the disruptor.
*
* @param jobId job id, describe which job this task belongs to
* @param taskId task id, it's linked to job id, we can get job detail by task id
* @param taskType {@link TaskType}
*/
public void tryPublish(Long jobId, Long taskId, TaskType taskType) {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId);
return;
}
try {
disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType);
} catch (Exception e) {
log.warn("tryPublish failed, jobId: {}", jobId, e);
}
}
/**
* Publishes a task to the disruptor.
* Default task type is {@link TaskType#TRANSIENT_TASK}
*
* @param taskId task id
*/
public void tryPublishTask(Long taskId) throws JobException {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId);
return;
}
// We reserve two slots in the ring buffer
// to prevent it from becoming stuck due to competition between producers and consumers.
if (disruptor.getRingBuffer().hasAvailableCapacity(2)) {
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK);
} else {
throw new JobException("There is not enough available capacity in the RingBuffer.");
}
}
@Override
public void close() {
try {
isClosed = true;
// we can wait for 5 seconds, so that backlog can be committed
disruptor.shutdown(DEFAULT_CLOSE_WAIT_TIME_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.warn("close disruptor failed", e);
}
}
}