MTMVJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.extensions.mtmv;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
private ReentrantReadWriteLock jobRwLock;
private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId", ScalarType.createVarchar(20)))
.addColumn(new Column("JobName", ScalarType.createVarchar(20)))
.addColumn(new Column("ExecuteType", ScalarType.createVarchar(20)))
.addColumn(new Column("RecurringStrategy", ScalarType.createVarchar(20)))
.addColumn(new Column("JobStatus", ScalarType.createVarchar(20)))
.addColumn(new Column("CreateTime", ScalarType.createVarchar(20)))
.addColumn(new Column("Comment", ScalarType.createVarchar(20)))
.build();
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
new Column("MvId", ScalarType.createStringType()),
new Column("MvName", ScalarType.createStringType()),
new Column("MvDatabaseId", ScalarType.createStringType()),
new Column("MvDatabaseName", ScalarType.createStringType()),
new Column("ExecuteType", ScalarType.createStringType()),
new Column("RecurringStrategy", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
private static final ShowResultSetMetaData TASK_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId", ScalarType.createVarchar(20)))
.addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
.addColumn(new Column("Status", ScalarType.createVarchar(20)))
.addColumn(new Column("CreateTime", ScalarType.createVarchar(20)))
.addColumn(new Column("StartTime", ScalarType.createVarchar(20)))
.addColumn(new Column("FinishTime", ScalarType.createVarchar(20)))
.addColumn(new Column("DurationMs", ScalarType.createVarchar(20)))
.addColumn(new Column("ExecuteSql", ScalarType.createVarchar(20)))
.build();
@SerializedName(value = "di")
private long dbId;
@SerializedName(value = "mi")
private long mtmvId;
public MTMVJob() {
jobRwLock = new ReentrantReadWriteLock(true);
}
public MTMVJob(long dbId, long mtmvId) {
this.dbId = dbId;
this.mtmvId = mtmvId;
super.setCreateTimeMs(System.currentTimeMillis());
jobRwLock = new ReentrantReadWriteLock(true);
}
@Override
protected void checkJobParamsInternal() {
}
@Override
public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext taskContext) {
LOG.info("begin create mtmv task, jobId: {}, taskContext: {}", super.getJobId(), taskContext);
if (taskContext == null) {
taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
}
MTMVTask task = new MTMVTask(dbId, mtmvId, taskContext);
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks, taskType);
LOG.info("finish create mtmv task, task: {}", task);
return tasks;
}
/**
* if user trigger, return true
* else, only can have 2 task. because every task can refresh all data.
*
* @param taskContext
* @return
*/
@Override
public boolean isReadyForScheduling(MTMVTaskContext taskContext) {
if (isManual(taskContext)) {
return true;
}
List<MTMVTask> runningTasks = getRunningTasks();
int runningNum = 0;
for (MTMVTask task : runningTasks) {
if (!isManual(task.getTaskContext())) {
runningNum++;
// Prerequisite: Each refresh will calculate which partitions to refresh
//
// For example, there is currently a running task that is refreshing partition p1.
// If the data of p2 changes at this time and triggers a refresh task t2,
// according to the logic (>=1), t2 will be lost
//
// If the logic is >=2, t2 will wait lock of MTMVJob.
// If the p3 data changes again and triggers the refresh task t3,
// then t3 will be discarded. However, when t2 runs, both p2 and p3 data will be refreshed.
if (runningNum >= 2) {
LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}",
task);
return false;
}
}
}
return true;
}
private boolean isManual(MTMVTaskContext taskContext) {
return taskContext != null && taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL;
}
@Override
public ShowResultSetMetaData getJobMetaData() {
return JOB_META_DATA;
}
@Override
public ShowResultSetMetaData getTaskMetaData() {
return TASK_META_DATA;
}
@Override
public JobType getJobType() {
return JobType.MV;
}
@Override
public List<MTMVTask> queryTasks() {
MTMV mtmv = null;
try {
mtmv = getMTMV();
} catch (DdlException | MetaNotFoundException e) {
LOG.warn("get mtmv failed", e);
return Lists.newArrayList();
}
return Lists.newArrayList(mtmv.getHistoryTasks());
}
@Override
public List<String> getShowInfo() {
List<String> data = Lists.newArrayList();
data.add(String.valueOf(super.getJobId()));
data.add(super.getJobName());
data.add(super.getJobConfig().getExecuteType().name());
data.add(super.getJobConfig().convertRecurringStrategyToString());
data.add(super.getJobStatus().name());
data.add(TimeUtils.longToTimeString(super.getCreateTimeMs()));
data.add(super.getComment());
return data;
}
@Override
public String formatMsgWhenExecuteQueueFull(Long taskId) {
return commonFormatMsgWhenExecuteQueueFull(taskId, "mtmv_task_queue_size",
"job_mtmv_task_consumer_thread_num");
}
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
String dbName = "";
String mvName = "";
try {
MTMV mtmv = getMTMV();
dbName = mtmv.getQualifiedDbName();
mvName = mtmv.getName();
} catch (UserException e) {
LOG.warn("can not find mv", e);
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId)));
trow.addToColumnValue(new TCell().setStringVal(mvName));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId)));
trow.addToColumnValue(new TCell().setStringVal(dbName));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
return trow;
}
public boolean hasPriv(UserIdentity userIdentity, PrivPredicate wanted) {
MTMV mtmv;
try {
mtmv = getMTMV();
} catch (UserException e) {
LOG.warn("can not find mv", e);
return false;
}
return Env.getCurrentEnv().getAccessManager()
.checkTblPriv(userIdentity, InternalCatalog.INTERNAL_CATALOG_NAME,
mtmv.getQualifiedDbName(), mtmv.getName(),
wanted);
}
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
}
public long getMtmvId() {
return mtmvId;
}
@Override
public boolean needPersist() {
return false;
}
public void readLock() {
this.jobRwLock.readLock().lock();
}
public void readUnlock() {
this.jobRwLock.readLock().unlock();
}
public void writeLock() {
this.jobRwLock.writeLock().lock();
}
public void writeUnlock() {
this.jobRwLock.writeLock().unlock();
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
}