MTMVJobManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* when do some operation, do something about job
*/
public class MTMVJobManager implements MTMVHookService {
private static final Logger LOG = LogManager.getLogger(MTMVJobManager.class);
public static final String MTMV_JOB_PREFIX = "inner_mtmv_";
// if immediate, triggerJob after create MTMT
@Override
public void postCreateMTMV(MTMV mtmv) {
if (!mtmv.getRefreshInfo().getBuildMode().equals(BuildMode.IMMEDIATE)) {
return;
}
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM, null, true);
try {
Env.getCurrentEnv().getJobManager().triggerJob(mtmv.getId(), mtmvTaskContext);
} catch (JobException e) {
// should not happen
LOG.warn("triggerJob failed by mvName: {}", mtmv.getName(), e);
}
}
private JobExecutionConfiguration getJobConfig(MTMV mtmv) {
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
RefreshTrigger refreshTrigger = mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger();
// if immediate, mtmv will trigger it, not need job manager deal this
jobExecutionConfiguration.setImmediate(false);
if (refreshTrigger.equals(RefreshTrigger.SCHEDULE)) {
setScheduleJobConfig(jobExecutionConfiguration, mtmv);
} else if (refreshTrigger.equals(RefreshTrigger.MANUAL) || refreshTrigger.equals(RefreshTrigger.COMMIT)) {
jobExecutionConfiguration.setExecuteType(JobExecuteType.MANUAL);
}
return jobExecutionConfiguration;
}
private void setScheduleJobConfig(JobExecutionConfiguration jobExecutionConfiguration, MTMV mtmv) {
jobExecutionConfiguration.setExecuteType(JobExecuteType.RECURRING);
MTMVRefreshInfo refreshMTMVInfo = mtmv.getRefreshInfo();
TimerDefinition timerDefinition = new TimerDefinition();
timerDefinition
.setInterval(refreshMTMVInfo.getRefreshTriggerInfo().getIntervalTrigger().getInterval());
timerDefinition
.setIntervalUnit(refreshMTMVInfo.getRefreshTriggerInfo().getIntervalTrigger().getTimeUnit());
if (!StringUtils
.isEmpty(refreshMTMVInfo.getRefreshTriggerInfo().getIntervalTrigger().getStartTime())) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(
refreshMTMVInfo.getRefreshTriggerInfo().getIntervalTrigger().getStartTime()));
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
}
@Override
public void registerMTMV(MTMV mtmv, Long dbId) {
}
@Override
public void unregisterMTMV(MTMV mtmv) {
}
public void createJob(MTMV mtmv, boolean isReplay) {
MTMVJob job = new MTMVJob(mtmv.getDatabase().getId(), mtmv.getId());
// The jobId should remain constant, as it serves as the unique identifier when updating the job.
job.setJobId(mtmv.getId());
job.setJobName(mtmv.getJobInfo().getJobName());
job.setCreateUser(UserIdentity.ADMIN);
job.setJobStatus(JobStatus.RUNNING);
job.setJobConfig(getJobConfig(mtmv));
job.initParams();
try {
Env.getCurrentEnv().getJobManager().createJobInternal(job, isReplay);
} catch (JobException e) {
// should not happen
LOG.warn("triggerJob failed by mvName: {}", mtmv.getName(), e);
}
}
public void dropJob(MTMV mtmv, boolean isReplay) {
MTMVJob job = getJobByMTMV(mtmv);
try {
Env.getCurrentEnv().getJobManager().dropJobInternal(job, isReplay);
} catch (JobException e) {
// should not happen
LOG.warn("dropJob failed by mvName: {}", mtmv.getName(), e);
}
}
public void alterJob(MTMV mtmv, boolean isReplay) {
dropJob(mtmv, isReplay);
createJob(mtmv, isReplay);
}
/**
* trigger MTMVJob
*
* @param info
* @throws DdlException
* @throws MetaNotFoundException
*/
@Override
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
info.isComplete());
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
}
@Override
public void refreshComplete(MTMV mtmv, MTMVRelation relation, MTMVTask task) {
}
@Override
public void dropTable(Table table) {
}
@Override
public void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace) {
}
@Override
public void pauseMTMV(PauseMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.PAUSED);
}
@Override
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
Env.getCurrentEnv().getJobManager().alterJobStatus(job.getJobId(), JobStatus.RUNNING);
}
@Override
public void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNotFoundException, JobException {
MTMVJob job = getJobByTableNameInfo(info.getMvName());
job.cancelTaskById(info.getTaskId());
}
public void onCommit(MTMV mtmv) throws DdlException, JobException {
MTMVJob job = getJobByMTMV(mtmv);
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
if (LOG.isDebugEnabled()) {
LOG.debug("job status of async materialized view: [{}] is: [{}], ignore this event.", mtmv.getName(),
job.getJobStatus());
}
return;
}
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.COMMIT, Lists.newArrayList(),
false);
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
}
private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW);
return getJobByMTMV(mtmv);
}
private MTMVJob getJobByMTMV(MTMV mtmv) {
return (MTMVJob) Env.getCurrentEnv().getJobManager().getJob(mtmv.getId());
}
}