JobManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.manager;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Log4j2
public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
private static final Logger LOG = LogManager.getLogger(JobManager.class);
private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32);
private JobScheduler<T, C> jobScheduler;
// lock for job
// lock is private and must use after db lock
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public void start() {
jobScheduler = new JobScheduler<T, C>(jobMap);
jobScheduler.start();
}
/**
* get running job
*
* @param jobId id
* @return running job
*/
public T getJob(long jobId) {
return jobMap.get(jobId);
}
public void registerJob(T job) throws JobException {
job.initParams();
createJobInternal(job, false);
jobScheduler.scheduleOneJob(job);
}
public void createJobInternal(T job, boolean isReplay) throws JobException {
writeLock();
try {
if (!isReplay) {
job.onRegister();
job.checkJobParams();
checkJobNameExist(job.getJobName());
if (jobMap.get(job.getJobId()) != null) {
throw new JobException("job id exist, jobId:" + job.getJobId());
}
}
jobMap.put(job.getJobId(), job);
if (isReplay) {
job.onReplayCreate();
}
if (!isReplay && job.needPersist()) {
job.logCreateOperation();
}
} finally {
writeUnlock();
}
if (!isReplay) {
jobScheduler.cycleTimerJobScheduler(job);
}
}
private void checkJobNameExist(String jobName) throws JobException {
if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName))) {
throw new JobException("job name exist, jobName:" + jobName);
}
}
/**
* unregister job by job id,this method will delete job from job map
* we need to check job status, if job status is running, we need to stop it
* and cancel all running task
*/
public void unregisterJob(Long jobId) throws JobException {
checkJobExist(jobId);
T dropJob = jobMap.get(jobId);
dropJob(dropJob, dropJob.getJobName());
}
/**
* unregister job by job name,this method will delete job from job map
*
* @param jobName job name
* @param ifExists is is true, if job not exist,we will ignore job not exist exception, else throw exception
*/
public void unregisterJob(String jobName, boolean ifExists) throws JobException {
try {
T dropJob = null;
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
dropJob = job;
}
}
if (dropJob == null && ifExists) {
return;
}
dropJob(dropJob, jobName);
} catch (Exception e) {
log.error("drop job error, jobName:" + jobName, e);
throw new JobException("unregister job error, jobName:" + jobName);
}
}
private void dropJob(T dropJob, String jobName) throws JobException {
if (dropJob == null) {
throw new JobException("job not exist, jobName:" + jobName);
}
dropJobInternal(dropJob, false);
}
public void dropJobInternal(T job, boolean isReplay) throws JobException {
if (!isReplay) {
// is job status is running, we need to stop it and cancel all running task
// since job only running in master, we don't need to write update metadata log
if (job.getJobStatus().equals(JobStatus.RUNNING)) {
job.updateJobStatus(JobStatus.STOPPED);
}
}
writeLock();
try {
jobMap.remove(job.getJobId());
if (isReplay) {
job.onReplayEnd(job);
}
// write delete log
if (!isReplay && job.needPersist()) {
job.logDeleteOperation();
}
} finally {
writeUnlock();
}
}
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
if (status.equals(JobStatus.RUNNING)) {
jobScheduler.cycleTimerJobScheduler(jobMap.get(jobId));
}
jobMap.get(jobId).logUpdateOperation();
}
public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
try {
if (jobStatus.equals(a.getJobStatus())) {
throw new JobException("Can't change job status to the same status");
}
alterJobStatus(a.getJobId(), jobStatus);
} catch (JobException e) {
throw new JobException("Alter job status error, jobName is %s, errorMsg is %s",
jobName, e.getMessage());
}
}
}
}
private void checkJobExist(Long jobId) throws JobException {
if (null == jobMap.get(jobId)) {
throw new JobException("job not exist, jobId:" + jobId);
}
}
public List<T> queryJobs(JobType type) {
return jobMap.values().stream().filter(a -> a.getJobType().equals(type))
.collect(java.util.stream.Collectors.toList());
}
public List<T> queryJobs(JobType jobType, String jobName) {
//only query insert job,we just provide insert job
return jobMap.values().stream().filter(a -> checkItsMatch(jobType, jobName, a))
.collect(Collectors.toList());
}
/**
* query jobs by job type
*
* @param jobTypes @JobType
* @return List<AbstractJob> job list
*/
public List<T> queryJobs(List<JobType> jobTypes) {
return jobMap.values().stream().filter(a -> checkItsMatch(jobTypes, a))
.collect(Collectors.toList());
}
private boolean checkItsMatch(JobType jobType, String jobName, T job) {
if (null == jobType) {
throw new IllegalArgumentException("jobType cannot be null");
}
if (StringUtils.isBlank(jobName)) {
return job.getJobType().equals(jobType);
}
return job.getJobType().equals(jobType) && job.getJobName().equals(jobName);
}
private boolean checkItsMatch(List<JobType> jobTypes, T job) {
if (null == jobTypes) {
throw new IllegalArgumentException("jobType cannot be null");
}
return jobTypes.contains(job.getJobType());
}
/**
* Actively trigger job execution tasks, tasks type is manual
*
* @param jobId job id
* @param context Context parameter information required by some tasks executed this time
*/
public void triggerJob(long jobId, C context) throws JobException {
log.info("trigger job, job id is {}", jobId);
checkJobExist(jobId);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context);
}
public void replayCreateJob(T job) throws JobException {
// mtmv has its own editLog to replay jobs, here it is to ignore the logs already generated by older versions.
if (!job.needPersist()) {
return;
}
createJobInternal(job, true);
}
/**
* Replay update load job.
**/
public void replayUpdateJob(T job) {
Long jobId = job.getJobId();
// In previous versions, the job ID in MTMV was not fixed (a new ID would be generated each time the editLog
// was replayed), but the name was constant and unique. However, since job updates use jobId as the key,
// it is possible that this jobId no longer exists. Therefore, we now look up the ID based on the name.
if (!jobMap.containsKey(jobId) && job instanceof MTMVJob) {
List<T> jobs = queryJobs(JobType.MV, job.getJobName());
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
LOG.warn("jobs by name: {} not normal,should have one job,but job num is: {}", job.getJobName(),
jobs.size());
return;
}
jobId = jobs.get(0).getJobId();
job.setJobId(jobId);
}
if (!jobMap.containsKey(jobId)) {
LOG.warn("replayUpdateJob not normal, job: {}, jobId: {}, jobMap: {}", job, jobId, jobMap);
return;
}
jobMap.put(jobId, job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
.add("msg", "replay update scheduler job").build());
}
/**
* Replay delete load job. we need to remove job from job map
*/
public void replayDeleteJob(T replayJob) throws JobException {
// mtmv has its own editLog to replay jobs, here it is to ignore the logs already generated by older version
if (!replayJob.needPersist()) {
return;
}
dropJobInternal(replayJob, true);
}
/**
* Cancel task by task id, if task is running, cancel it
* if job not exist, throw JobException exception job not exist
* if task not exist, throw JobException exception task not exist
*
* @param jobName job name
* @param taskId task id
*/
public void cancelTaskById(String jobName, Long taskId) throws JobException {
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
job.cancelTaskById(taskId);
job.logUpdateOperation();
return;
}
}
throw new JobException("job not exist, jobName:" + jobName);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(jobMap.size());
jobMap.forEach((jobId, job) -> {
try {
job.write(out);
} catch (IOException e) {
log.error("write job error, jobId:" + jobId, e);
}
});
}
/**
* read job from data input, and init job
*
* @param in data input
* @throws IOException io exception when read data input error
*/
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
AbstractJob job = AbstractJob.readFields(in);
// for compatible
if (job instanceof MTMVJob) {
job.setJobId(((MTMVJob) job).getMtmvId());
}
jobMap.putIfAbsent(job.getJobId(), (T) job);
}
}
public T getJob(Long jobId) {
return jobMap.get(jobId);
}
/**
* get load info by db
*
* @param dbId db id
* @param dbName db name
* @param labelValue label name
* @param accurateMatch accurate match
* @param jobState state
* @return load infos
* @throws AnalysisException ex
*/
public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName,
String labelValue,
boolean accurateMatch,
JobState jobState, String catalogName) throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<>();
if (!Env.getCurrentEnv().getLabelProcessor().existJobs(dbId)) {
return loadJobInfos;
}
readLock();
try {
List<InsertJob> loadJobList = Env.getCurrentEnv().getLabelProcessor()
.filterJobs(dbId, labelValue, accurateMatch);
// check state
for (InsertJob loadJob : loadJobList) {
try {
if (jobState != null && !validState(jobState, loadJob)) {
continue;
}
// check auth
try {
checkJobAuth(catalogName, dbName, loadJob.getTableNames());
} catch (AnalysisException e) {
continue;
}
// add load job info, convert String list to Comparable list
loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo()));
} catch (RuntimeException e) {
// ignore this load job
log.warn("get load job info failed. job id: {}", loadJob.getJobId(), e);
}
}
return loadJobInfos;
} finally {
readUnlock();
}
}
public void checkJobAuth(String ctlName, String dbName, Set<String> tableNames) throws AnalysisException {
if (tableNames.isEmpty()) {
if (!Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), ctlName, dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED_ERROR,
PrivPredicate.LOAD.getPrivs().toString(), dbName);
}
} else {
for (String tblName : tableNames) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), ctlName, dbName,
tblName, PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_ACCESS_DENIED_ERROR,
PrivPredicate.LOAD.getPrivs().toString(), tblName);
return;
}
}
}
}
private static boolean validState(JobState jobState, InsertJob loadJob) {
JobStatus status = loadJob.getJobStatus();
switch (status) {
case RUNNING:
return jobState == JobState.PENDING || jobState == JobState.ETL
|| jobState == JobState.LOADING || jobState == JobState.COMMITTED;
case STOPPED:
return jobState == JobState.CANCELLED;
case FINISHED:
return jobState == JobState.FINISHED;
default:
return false;
}
}
//todo it's not belong to JobManager
public void cancelLoadJob(CancelLoadStmt cs)
throws JobException, AnalysisException, DdlException {
String dbName = cs.getDbName();
String label = cs.getLabel();
String state = cs.getState();
CompoundPredicate.Operator operator = cs.getOperator();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<InsertJob> unfinishedLoadJob;
readLock();
try {
List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db);
List<InsertJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new JobException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(InsertJob::isRunning)
.collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new JobException("There is no uncompleted job");
}
} finally {
readUnlock();
}
// check auth
if (unfinishedLoadJob.size() > 1 || unfinishedLoadJob.get(0).getTableNames().isEmpty()) {
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName);
}
} else {
for (String tableName : unfinishedLoadJob.get(0).getTableNames()) {
if (Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ":" + tableName);
}
}
}
for (InsertJob loadJob : unfinishedLoadJob) {
try {
alterJobStatus(loadJob.getJobId(), JobStatus.STOPPED);
} catch (JobException e) {
log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName());
}
}
}
private static void addNeedCancelLoadJob(String label, String state,
CompoundPredicate.Operator operator, List<InsertJob> loadJobs,
List<InsertJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> !job.isCancelled())
.filter(job -> {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state);
return CompoundPredicate.Operator.AND.equals(operator) ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getJobStatus().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}
// public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
// long scannedBytes, boolean isDone) {
// AbstractJob job = jobMap.get(jobId);
// if (job != null) {
// job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
// }
// }
/**
* used for nereids planner
*/
public void cancelLoadJob(String dbName, String label, String state,
Expression operator)
throws JobException, AnalysisException, DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<InsertJob> unfinishedLoadJob;
readLock();
try {
List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db);
List<InsertJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new JobException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(InsertJob::isRunning)
.collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new JobException("There is no uncompleted job");
}
} finally {
readUnlock();
}
// check auth
if (unfinishedLoadJob.size() > 1 || unfinishedLoadJob.get(0).getTableNames().isEmpty()) {
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName);
}
} else {
for (String tableName : unfinishedLoadJob.get(0).getTableNames()) {
if (Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ":" + tableName);
}
}
}
for (InsertJob loadJob : unfinishedLoadJob) {
try {
alterJobStatus(loadJob.getJobId(), JobStatus.STOPPED);
} catch (JobException e) {
log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName());
}
}
}
private static void addNeedCancelLoadJob(String label, String state,
Expression operator,
List<InsertJob> loadJobs,
List<InsertJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> !job.isCancelled())
.filter(job -> {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state);
return operator instanceof And ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getJobStatus().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}
}