RoutineLoadManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.load.PauseRoutineLoadCommand;
import org.apache.doris.nereids.trees.plans.commands.load.ResumeRoutineLoadCommand;
import org.apache.doris.nereids.trees.plans.commands.load.StopRoutineLoadCommand;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class RoutineLoadManager implements Writable {
private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class);
// Long is beId, integer is the size of tasks in be
private Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
// routine load job meta
private Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap();
private Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap();
private ConcurrentHashMap<Long, Long> multiLoadTaskTxnIdToRoutineLoadJobId = new ConcurrentHashMap<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
// Map<beId, timestamp when added to blacklist>
private Map<Long, Long> blacklist = new ConcurrentHashMap<>();
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public RoutineLoadManager() {
}
public Map<Long, Long> getBlacklist() {
return blacklist;
}
public List<RoutineLoadJob> getAllRoutineLoadJobs() {
return new ArrayList<>(idToRoutineLoadJob.values());
}
public List<RoutineLoadJob> getActiveRoutineLoadJobs() {
return idToRoutineLoadJob.values().stream()
.filter(job -> !job.state.isFinalState())
.collect(Collectors.toList());
}
public void addMultiLoadTaskTxnIdToRoutineLoadJobId(long txnId, long routineLoadJobId) {
multiLoadTaskTxnIdToRoutineLoadJobId.put(txnId, routineLoadJobId);
}
public RoutineLoadJob getRoutineLoadJobByMultiLoadTaskTxnId(long txnId) {
long routineLoadJobId = multiLoadTaskTxnIdToRoutineLoadJobId.get(txnId);
if (routineLoadJobId == 0) {
return null;
}
return idToRoutineLoadJob.get(routineLoadJobId);
}
public void removeMultiLoadTaskTxnIdToRoutineLoadJobId(long txnId) {
multiLoadTaskTxnIdToRoutineLoadJobId.remove(txnId);
}
public void updateBeIdToMaxConcurrentTasks() {
beIdToMaxConcurrentTasks = Env.getCurrentSystemInfo().getAllBackendIds(true).stream().collect(
Collectors.toMap(beId -> beId, beId -> Config.max_routine_load_task_num_per_be));
}
// this is not real-time number
public int getTotalMaxConcurrentTaskNum() {
return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum();
}
// return the map of be id -> running tasks num
private Map<Long, Integer> getBeCurrentTasksNumMap() {
Map<Long, Integer> beCurrentTaskNumMap = Maps.newHashMap();
for (RoutineLoadJob routineLoadJob : getRoutineLoadJobByState(
Sets.newHashSet(RoutineLoadJob.JobState.RUNNING))) {
Map<Long, Integer> jobBeCurrentTasksNumMap = routineLoadJob.getBeCurrentTasksNumMap();
for (Map.Entry<Long, Integer> entry : jobBeCurrentTasksNumMap.entrySet()) {
if (beCurrentTaskNumMap.containsKey(entry.getKey())) {
beCurrentTaskNumMap.put(entry.getKey(), beCurrentTaskNumMap.get(entry.getKey()) + entry.getValue());
} else {
beCurrentTaskNumMap.put(entry.getKey(), entry.getValue());
}
}
}
return beCurrentTaskNumMap;
}
// cloud override
public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt)
throws UserException {
// check load auth
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
InternalCatalog.INTERNAL_CATALOG_NAME,
createRoutineLoadStmt.getDBName(),
createRoutineLoadStmt.getTableName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
createRoutineLoadStmt.getDBName(),
createRoutineLoadStmt.getDBName() + ": " + createRoutineLoadStmt.getTableName());
}
RoutineLoadJob routineLoadJob = null;
LoadDataSourceType type = LoadDataSourceType.valueOf(createRoutineLoadStmt.getTypeName());
switch (type) {
case KAFKA:
routineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
break;
default:
throw new UserException("Unknown data source type: " + type);
}
routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
routineLoadJob.setComment(createRoutineLoadStmt.getComment());
addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName(),
createRoutineLoadStmt.getTableName());
}
public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName, String tableName)
throws UserException {
writeLock();
try {
// check if db.routineLoadName has been used
if (isNameUsed(routineLoadJob.getDbId(), routineLoadJob.getName())) {
throw new DdlException("Name " + routineLoadJob.getName() + " already used in db "
+ dbName);
}
if (getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE,
RoutineLoadJob.JobState.RUNNING, RoutineLoadJob.JobState.PAUSED)).size()
> Config.max_routine_load_job_num) {
throw new DdlException("There are more than " + Config.max_routine_load_job_num
+ " routine load jobs are running. exceed limit.");
}
unprotectedAddJob(routineLoadJob);
Env.getCurrentEnv().getEditLog().logCreateRoutineLoadJob(routineLoadJob);
} finally {
writeUnlock();
}
LOG.info("create routine load job: id: {}, job name: {}, db name: {}, table name: {}",
routineLoadJob.getId(), routineLoadJob.getName(), dbName, tableName);
}
private void unprotectedAddJob(RoutineLoadJob routineLoadJob) {
idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = dbToNameToRoutineLoadJob
.computeIfAbsent(routineLoadJob.getDbId(), k -> Maps.newConcurrentMap());
List<RoutineLoadJob> routineLoadJobList = nameToRoutineLoadJob
.computeIfAbsent(routineLoadJob.getName(), k -> Lists.newArrayList());
routineLoadJobList.add(routineLoadJob);
// add txn state callback in factory
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(routineLoadJob);
}
// TODO(ml): Idempotency
private boolean isNameUsed(Long dbId, String name) {
if (dbToNameToRoutineLoadJob.containsKey(dbId)) {
Map<String, List<RoutineLoadJob>> labelToRoutineLoadJob = dbToNameToRoutineLoadJob.get(dbId);
if (labelToRoutineLoadJob.containsKey(name)) {
List<RoutineLoadJob> routineLoadJobList = labelToRoutineLoadJob.get(name);
Optional<RoutineLoadJob> optional = routineLoadJobList.stream()
.filter(entity -> entity.getName().equals(name))
.filter(entity -> !entity.getState().isFinalState()).findFirst();
return optional.isPresent();
}
}
return false;
}
public RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
throws MetaNotFoundException, DdlException, AnalysisException {
RoutineLoadJob routineLoadJob = getJob(dbName, jobName);
if (routineLoadJob == null) {
throw new DdlException("There is not operable routine load job with name " + jobName);
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (routineLoadJob.isMultiTable()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
InternalCatalog.INTERNAL_CATALOG_NAME,
dbFullName,
PrivPredicate.LOAD)) {
// todo add new error code
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
dbFullName);
}
return routineLoadJob;
}
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
InternalCatalog.INTERNAL_CATALOG_NAME,
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
dbFullName + ": " + tableName);
}
return routineLoadJob;
}
// get all jobs which state is not in final state from specified database
public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
throws MetaNotFoundException, DdlException {
List<RoutineLoadJob> result = Lists.newArrayList();
Database database = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
long dbId = database.getId();
Map<String, List<RoutineLoadJob>> jobMap = dbToNameToRoutineLoadJob.get(dbId);
if (jobMap == null) {
// return empty result
return result;
}
for (List<RoutineLoadJob> jobs : jobMap.values()) {
for (RoutineLoadJob job : jobs) {
if (!job.getState().isFinalState()) {
String tableName = job.getTableName();
if (!job.isMultiTable() && !Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tableName, PrivPredicate.LOAD)) {
continue;
}
result.add(job);
}
}
}
return result;
}
public void pauseRoutineLoadJob(PauseRoutineLoadCommand pauseRoutineLoadCommand)
throws UserException {
List<RoutineLoadJob> jobs = Lists.newArrayList();
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
if (pauseRoutineLoadCommand.isAll()) {
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadCommand.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadCommand.getDbFullName(),
pauseRoutineLoadCommand.getLabel());
jobs.add(routineLoadJob);
}
} finally {
readUnlock();
}
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
"routine load job has been paused by user").build());
} catch (UserException e) {
LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
// if user want to pause a certain job and failed, return error.
// if user want to pause all possible jobs, skip error jobs.
if (!pauseRoutineLoadCommand.isAll()) {
throw e;
}
}
}
}
public void resumeRoutineLoadJob(ResumeRoutineLoadCommand resumeRoutineLoadCommand)
throws UserException {
List<RoutineLoadJob> jobs = Lists.newArrayList();
if (resumeRoutineLoadCommand.isAll()) {
jobs = checkPrivAndGetAllJobs(resumeRoutineLoadCommand.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadCommand.getDbFullName(),
resumeRoutineLoadCommand.getLabel());
jobs.add(routineLoadJob);
}
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.latestResumeTimestamp = 0;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been resumed by user")
.build());
} catch (UserException e) {
LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
// if user want to resume a certain job and failed, return error.
// if user want to resume all possible jobs, skip error jobs.
if (!resumeRoutineLoadCommand.isAll()) {
throw e;
}
}
}
}
public void stopRoutineLoadJob(StopRoutineLoadCommand stopRoutineLoadCommand)
throws UserException {
RoutineLoadJob routineLoadJob;
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
routineLoadJob = checkPrivAndGetJob(stopRoutineLoadCommand.getDbFullName(),
stopRoutineLoadCommand.getLabel());
} finally {
readUnlock();
}
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been stopped by user")
.build());
}
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
List<RoutineLoadJob> jobs = Lists.newArrayList();
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
if (pauseRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}
} finally {
readUnlock();
}
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
"routine load job has been paused by user").build());
} catch (UserException e) {
LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
// if user want to pause a certain job and failed, return error.
// if user want to pause all possible jobs, skip error jobs.
if (!pauseRoutineLoadStmt.isAll()) {
throw e;
}
continue;
}
}
}
public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
List<RoutineLoadJob> jobs = Lists.newArrayList();
if (resumeRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.latestResumeTimestamp = 0;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been resumed by user")
.build());
} catch (UserException e) {
LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
// if user want to resume a certain job and failed, return error.
// if user want to resume all possible jobs, skip error jobs.
if (!resumeRoutineLoadStmt.isAll()) {
throw e;
}
continue;
}
}
}
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob;
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
stopRoutineLoadStmt.getName());
} finally {
readUnlock();
}
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been stopped by user")
.build());
}
public int getSizeOfIdToRoutineLoadTask() {
int sizeOfTasks = 0;
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
sizeOfTasks += routineLoadJob.getSizeOfRoutineLoadTaskInfoList();
}
return sizeOfTasks;
}
public int getClusterIdleSlotNum() {
readLock();
try {
int result = 0;
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
for (Map.Entry<Long, Integer> entry : beIdToMaxConcurrentTasks.entrySet()) {
if (beIdToConcurrentTasks.containsKey(entry.getKey())) {
result += entry.getValue() - beIdToConcurrentTasks.get(entry.getKey());
} else {
result += entry.getValue();
}
}
return result;
} finally {
readUnlock();
}
}
// get the BE id with minimum running task on it
// return -1 if no BE is available.
// throw exception if unrecoverable errors happen.
// ATTN: this is only used for unit test now.
public long getMinTaskBeId(String clusterName) throws LoadException {
List<Long> beIdsInCluster = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (beIdsInCluster == null) {
throw new LoadException("The " + clusterName + " has been deleted");
}
readLock();
try {
long result = -1L;
int maxIdleSlotNum = 0;
updateBeIdToMaxConcurrentTasks();
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
for (Long beId : beIdsInCluster) {
if (beIdToMaxConcurrentTasks.containsKey(beId)) {
int idleTaskNum = 0;
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
} else {
idleTaskNum = Config.max_routine_load_task_num_per_be;
}
if (LOG.isDebugEnabled()) {
LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum,
beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId));
}
result = maxIdleSlotNum < idleTaskNum ? beId : result;
maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
}
}
return result;
} finally {
readUnlock();
}
}
// check if the specified BE is available for running task
// return true if it is available. return false if otherwise.
// throw exception if unrecoverable errors happen.
public long getAvailableBeForTask(long jobId, long previousBeId) throws LoadException {
List<Long> availableBeIds = getAvailableBackendIds(jobId);
// check if be has idle slot
readLock();
try {
updateBeIdToMaxConcurrentTasks();
Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
int previousBeIdleTaskNum = 0;
// 1. Find if the given BE id has more than half of available slots
if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
// get the previousBackend info
Backend previousBackend = Env.getCurrentSystemInfo().getBackend(previousBeId);
// check previousBackend is not null && load available
if (previousBackend != null && previousBackend.isLoadAvailable()) {
if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) {
previousBeIdleTaskNum = 0;
} else if (beIdToConcurrentTasks.containsKey(previousBeId)) {
previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId)
- beIdToConcurrentTasks.get(previousBeId);
} else {
previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId);
}
if (previousBeIdleTaskNum == Config.max_routine_load_task_num_per_be) {
return previousBeId;
}
}
}
// 2. we believe that the benefits of load balance outweigh the benefits of object pool cache,
// so we try to find the one with the most idle slots as much as possible
// 3. The previous BE is not in cluster && is not load available, find a new BE with min tasks
int idleTaskNum = 0;
long resultBeId = -1L;
int maxIdleSlotNum = 0;
for (Long beId : availableBeIds) {
if (beIdToMaxConcurrentTasks.containsKey(beId)) {
if (beIdToConcurrentTasks.containsKey(beId)) {
idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
} else {
idleTaskNum = Config.max_routine_load_task_num_per_be;
}
if (LOG.isDebugEnabled()) {
LOG.debug("be {} has idle {}, concurrent task {}, max concurrent task {}", beId, idleTaskNum,
beIdToConcurrentTasks.get(beId), beIdToMaxConcurrentTasks.get(beId));
}
resultBeId = maxIdleSlotNum < idleTaskNum ? beId : resultBeId;
maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum);
}
}
// 4. on the basis of selecting the maximum idle slot be,
// try to reuse the object cache as much as possible
if (previousBeIdleTaskNum == maxIdleSlotNum) {
return previousBeId;
}
return resultBeId;
} finally {
readUnlock();
}
}
// just for UT
public List<Long> getAvailableBackendIdsForUt(long jobId) throws LoadException {
return getAvailableBackendIds(jobId);
}
/**
* The routine load task can only be scheduled on backends which has proper resource tags.
* The tags should be got from user property.
* But in the old version, the routine load job does not have user info, so for compatibility,
* if there is no user info, we will get tags from replica allocation of the first partition of the table.
*
* @param jobId
* @param cluster
* @return
* @throws LoadException
*/
protected List<Long> getAvailableBackendIds(long jobId) throws LoadException {
// Usually Cloud node could not reach here(refer CloudRoutineLoadManager.getAvailableBackendIds),
// check cloud mode here is just to be on the safe side.
if (Config.isCloudMode()) {
throw new LoadException("cloud mode should not reach here");
}
RoutineLoadJob job = getJob(jobId);
if (job == null) {
throw new LoadException("job " + jobId + " does not exist");
}
Set<Tag> tags = null;
ComputeGroup computeGroup = null;
if (job.getUserIdentity() == null) {
// For old job, there may be no user info. So we have to use tags from replica allocation
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().addTags(tags).needLoadAvailable().build();
return Env.getCurrentSystemInfo()
.selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
} else {
computeGroup = Env.getCurrentEnv().getAuth().getComputeGroup(job.getUserIdentity().getQualifiedUser());
if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
// user may be dropped, or may not set resource tag property.
// Here we fall back to use replica tag
tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
}
if (computeGroup != null && !ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
return Env.getCurrentSystemInfo()
.selectBackendIdsByPolicy(policy, -1 /* as many as possible */,
computeGroup.getBackendList());
} else {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().addTags(tags).needLoadAvailable().build();
return Env.getCurrentSystemInfo()
.selectBackendIdsByPolicy(policy, -1 /* as many as possible */);
}
}
}
private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
try {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable tbl = (OlapTable) db.getTableOrMetaException(tblId, Table.TableType.OLAP);
tbl.readLock();
try {
PartitionInfo partitionInfo = tbl.getPartitionInfo();
for (Partition partition : tbl.getPartitions()) {
ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
// just use the first one
return replicaAlloc.getAllocMap().keySet();
}
// Should not run into here. Just make compiler happy.
return Sets.newHashSet();
} finally {
tbl.readUnlock();
}
} catch (MetaNotFoundException e) {
throw new LoadException(e.getMessage());
}
}
public RoutineLoadJob getJob(long jobId) {
return idToRoutineLoadJob.get(jobId);
}
public RoutineLoadJob getJob(String dbFullName, String jobName) throws MetaNotFoundException {
List<RoutineLoadJob> routineLoadJobList = getJob(dbFullName, jobName, false, null);
if (CollectionUtils.isEmpty(routineLoadJobList)) {
return null;
} else {
return routineLoadJobList.get(0);
}
}
/*
if dbFullName is null, result = all routine load job in all db
else if jobName is null, result = all routine load job in dbFullName
if includeHistory is false, filter not running job in result
else return all of result
*/
public List<RoutineLoadJob> getJob(String dbFullName, String jobName,
boolean includeHistory, PatternMatcher matcher)
throws MetaNotFoundException {
Preconditions.checkArgument(jobName == null || matcher == null,
"jobName and matcher cannot be not null at the same time");
// return all of routine load job
List<RoutineLoadJob> result;
RESULT:
{ // CHECKSTYLE IGNORE THIS LINE
if (dbFullName == null) {
result = new ArrayList<>(idToRoutineLoadJob.values());
sortRoutineLoadJob(result);
break RESULT;
}
Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbFullName);
long dbId = database.getId();
if (!dbToNameToRoutineLoadJob.containsKey(dbId)) {
result = new ArrayList<>();
break RESULT;
}
if (jobName == null) {
result = Lists.newArrayList();
for (List<RoutineLoadJob> nameToRoutineLoadJob : dbToNameToRoutineLoadJob.get(dbId).values()) {
List<RoutineLoadJob> routineLoadJobList = new ArrayList<>(nameToRoutineLoadJob);
sortRoutineLoadJob(routineLoadJobList);
result.addAll(routineLoadJobList);
}
break RESULT;
}
if (dbToNameToRoutineLoadJob.get(dbId).containsKey(jobName)) {
result = new ArrayList<>(dbToNameToRoutineLoadJob.get(dbId).get(jobName));
sortRoutineLoadJob(result);
break RESULT;
}
return null;
} // CHECKSTYLE IGNORE THIS LINE
if (!includeHistory) {
result = result.stream().filter(entity -> !entity.getState().isFinalState()).collect(Collectors.toList());
}
if (matcher != null) {
result = result.stream().filter(entity -> matcher.match(entity.getName())).collect(Collectors.toList());
}
return result;
}
// return all routine load job named jobName in all of db
public List<RoutineLoadJob> getJobByName(String jobName) {
List<RoutineLoadJob> result = Lists.newArrayList();
for (Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob : dbToNameToRoutineLoadJob.values()) {
if (nameToRoutineLoadJob.containsKey(jobName)) {
List<RoutineLoadJob> routineLoadJobList = new ArrayList<>(nameToRoutineLoadJob.get(jobName));
sortRoutineLoadJob(routineLoadJobList);
result.addAll(routineLoadJobList);
}
}
return result;
}
// put history job in the end
private void sortRoutineLoadJob(List<RoutineLoadJob> routineLoadJobList) {
if (routineLoadJobList == null) {
return;
}
int i = 0;
int j = routineLoadJobList.size() - 1;
while (i < j) {
while (!routineLoadJobList.get(i).isFinal() && (i < j)) {
i++;
}
while (routineLoadJobList.get(j).isFinal() && (i < j)) {
j--;
}
if (i < j) {
RoutineLoadJob routineLoadJob = routineLoadJobList.get(i);
routineLoadJobList.set(i, routineLoadJobList.get(j));
routineLoadJobList.set(j, routineLoadJob);
}
}
}
public boolean checkTaskInJob(RoutineLoadTaskInfo task) {
RoutineLoadJob routineLoadJob = idToRoutineLoadJob.get(task.getJobId());
if (routineLoadJob == null) {
return false;
}
return routineLoadJob.containsTask(task.getId());
}
public List<RoutineLoadJob> getRoutineLoadJobByState(Set<RoutineLoadJob.JobState> desiredStates) {
List<RoutineLoadJob> stateJobs = idToRoutineLoadJob.values().stream()
.filter(entity -> desiredStates.contains(entity.getState())).collect(Collectors.toList());
return stateJobs;
}
// RoutineLoadScheduler will run this method at fixed interval, and renew the timeout tasks
public void processTimeoutTasks() {
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
routineLoadJob.processTimeoutTasks();
}
}
// Remove old routine load jobs from idToRoutineLoadJob
// This function is called periodically.
// Cancelled and stopped job will be removed after Configure.label_keep_max_second seconds
public void cleanOldRoutineLoadJobs() {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to clean old routine load jobs ");
}
clearRoutineLoadJobIf(RoutineLoadJob::isExpired);
}
/**
* Remove finished routine load jobs from idToRoutineLoadJob
* This function is called periodically if Config.label_num_threshold is set.
* Cancelled and stopped job will be removed.
*/
public void cleanOverLimitRoutineLoadJobs() {
if (Config.label_num_threshold < 0
|| idToRoutineLoadJob.size() <= Config.label_num_threshold) {
return;
}
writeLock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to clean routine load jobs");
}
Deque<RoutineLoadJob> finishedJobs = idToRoutineLoadJob
.values()
.stream()
.filter(RoutineLoadJob::isFinal)
.sorted(Comparator.comparingLong(o -> o.endTimestamp))
.collect(Collectors.toCollection(ArrayDeque::new));
while (!finishedJobs.isEmpty()
&& idToRoutineLoadJob.size() > Config.label_num_threshold) {
RoutineLoadJob routineLoadJob = finishedJobs.pollFirst();
unprotectedRemoveJobFromDb(routineLoadJob);
idToRoutineLoadJob.remove(routineLoadJob.getId());
RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(),
routineLoadJob.getState());
Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation);
}
} finally {
writeUnlock();
}
}
private void clearRoutineLoadJobIf(Predicate<RoutineLoadJob> pred) {
writeLock();
try {
Iterator<Map.Entry<Long, RoutineLoadJob>> iterator = idToRoutineLoadJob.entrySet().iterator();
long currentTimestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
RoutineLoadJob routineLoadJob = iterator.next().getValue();
if (pred.test(routineLoadJob)) {
unprotectedRemoveJobFromDb(routineLoadJob);
iterator.remove();
RoutineLoadOperation operation = new RoutineLoadOperation(routineLoadJob.getId(),
routineLoadJob.getState());
Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("end_timestamp", routineLoadJob.getEndTimestamp())
.add("current_timestamp", currentTimestamp)
.add("job_state", routineLoadJob.getState())
.add("msg", "old job has been cleaned")
);
}
}
} finally {
writeUnlock();
}
}
public void replayRemoveOldRoutineLoad(RoutineLoadOperation operation) {
writeLock();
try {
RoutineLoadJob job = idToRoutineLoadJob.remove(operation.getId());
if (job != null) {
unprotectedRemoveJobFromDb(job);
}
} finally {
writeUnlock();
}
LOG.info("replay remove routine load job: {}", operation.getId());
}
private void unprotectedRemoveJobFromDb(RoutineLoadJob routineLoadJob) {
dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).remove(routineLoadJob);
if (dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).get(routineLoadJob.getName()).isEmpty()) {
dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).remove(routineLoadJob.getName());
}
if (dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId()).isEmpty()) {
dbToNameToRoutineLoadJob.remove(routineLoadJob.getDbId());
}
}
public void updateRoutineLoadJob() throws UserException {
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
if (!routineLoadJob.state.isFinalState()) {
routineLoadJob.update();
}
}
}
public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
unprotectedAddJob(routineLoadJob);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("msg", "replay create routine load job")
.build());
}
public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) {
RoutineLoadJob job = getJob(operation.getId());
try {
job.updateState(operation.getJobState(), operation.getErrorReason(), true /* is replay */);
} catch (UserException e) {
LOG.error("should not happened", e);
} catch (NullPointerException npe) {
LOG.error("cannot get job when replaying state change job, which is unexpected, job id: "
+ operation.getId());
}
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId())
.add("current_state", operation.getJobState())
.add("msg", "replay change routine load job")
.build());
}
/**
* Enter of altering a routine load job
*/
public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException {
RoutineLoadJob job;
// it needs lock when getting routine load job,
// otherwise, it may cause the editLog out of order in the following scenarios:
// thread A: create job and record job meta
// thread B: change job state and persist in editlog according to meta
// thread A: persist in editlog
// which will cause the null pointer exception when replaying editLog
readLock();
try {
job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
} finally {
readUnlock();
}
if (stmt.hasDataSourceProperty()
&& !stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name())) {
throw new DdlException("The specified job type is not: "
+ stmt.getDataSourceProperties().getDataSourceType());
}
job.modifyProperties(stmt);
}
public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) {
RoutineLoadJob job = getJob(log.getJobId());
Preconditions.checkNotNull(job, log.getJobId());
job.replayModifyProperties(log);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(idToRoutineLoadJob.size());
for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
routineLoadJob.write(out);
}
}
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
RoutineLoadJob routineLoadJob = RoutineLoadJob.read(in);
idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
Map<String, List<RoutineLoadJob>> map = dbToNameToRoutineLoadJob.get(routineLoadJob.getDbId());
if (map == null) {
map = Maps.newConcurrentMap();
dbToNameToRoutineLoadJob.put(routineLoadJob.getDbId(), map);
}
List<RoutineLoadJob> jobs = map.get(routineLoadJob.getName());
if (jobs == null) {
jobs = Lists.newArrayList();
map.put(routineLoadJob.getName(), jobs);
}
jobs.add(routineLoadJob);
if (!routineLoadJob.getState().isFinalState()) {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(routineLoadJob);
}
}
}
public void addToBlacklist(long beId) {
blacklist.put(beId, System.currentTimeMillis());
}
public boolean isInBlacklist(long beId) {
Long timestamp = blacklist.get(beId);
if (timestamp == null) {
return false;
}
if (System.currentTimeMillis() - timestamp > Config.routine_load_blacklist_expire_time_second * 1000) {
blacklist.remove(beId);
LOG.info("remove beId {} from blacklist, blacklist: {}", beId, blacklist);
return false;
}
return true;
}
}