MTMVService.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.event.DropPartitionEvent;
import org.apache.doris.event.Event;
import org.apache.doris.event.EventException;
import org.apache.doris.event.EventListener;
import org.apache.doris.event.TableEvent;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class MTMVService implements EventListener {
private static final Logger LOG = LogManager.getLogger(MTMVService.class);
private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap();
private MTMVRelationManager relationManager = new MTMVRelationManager();
private MTMVJobManager jobManager = new MTMVJobManager();
public MTMVService() {
registerHook("MTMVJobManager", jobManager);
registerHook("MTMVRelationManager", relationManager);
}
public MTMVRelationManager getRelationManager() {
return relationManager;
}
public void registerHook(String name, MTMVHookService mtmvHookService) {
Objects.requireNonNull(name);
Objects.requireNonNull(mtmvHookService);
hooks.put(name, mtmvHookService);
LOG.info("registerHook: " + name);
}
public void deregisterHook(String name) {
hooks.remove(name);
LOG.info("deregisterHook: " + name);
}
public void registerMTMV(MTMV mtmv, Long dbId) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
LOG.info("registerMTMV: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.registerMTMV(mtmv, dbId);
}
}
public void unregisterMTMV(MTMV mtmv) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
LOG.info("deregisterMTMV: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.unregisterMTMV(mtmv);
}
}
public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
Objects.requireNonNull(info, "info can not be null");
LOG.info("refreshMTMV, RefreshMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.refreshMTMV(info);
}
}
public void dropTable(Table table) {
Objects.requireNonNull(table, "table can not be null");
LOG.info("dropTable, tableName: {}", table.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.dropTable(table);
}
}
public void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace) {
Objects.requireNonNull(oldTableInfo, "oldTableInfo can not be null");
Objects.requireNonNull(newTableInfo, "newTableInfo can not be null");
LOG.info("alterTable, oldTableInfo: {}, newTableInfo: {}, isReplace: {}", oldTableInfo, newTableInfo,
isReplace);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.alterTable(oldTableInfo, newTableInfo, isReplace);
}
}
public void refreshComplete(MTMV mtmv, MTMVRelation cache, MTMVTask task) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
Objects.requireNonNull(task, "task can not be null");
LOG.info("refreshComplete: " + mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.refreshComplete(mtmv, cache, task);
}
}
public void pauseMTMV(PauseMTMVInfo info) throws DdlException, MetaNotFoundException, JobException {
Objects.requireNonNull(info, "info can not be null");
LOG.info("pauseMTMV, PauseMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.pauseMTMV(info);
}
}
public void resumeMTMV(ResumeMTMVInfo info) throws MetaNotFoundException, DdlException, JobException {
Objects.requireNonNull(info, "info can not be null");
LOG.info("resumeMTMV, ResumeMTMVInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.resumeMTMV(info);
}
}
public void cancelMTMVTask(CancelMTMVTaskInfo info) throws MetaNotFoundException, DdlException, JobException {
Objects.requireNonNull(info, "info can not be null");
LOG.info("cancelMTMVTask, CancelMTMVTaskInfo: {}", info);
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.cancelMTMVTask(info);
}
}
@Override
public void processEvent(Event event) throws EventException {
Objects.requireNonNull(event, "event can not be null");
if (!(event instanceof TableEvent)) {
return;
}
if (event instanceof DropPartitionEvent && ((DropPartitionEvent) event).isTempPartition()) {
return;
}
TableEvent tableEvent = (TableEvent) event;
LOG.info("processEvent, Event: {}", event);
TableIf table;
try {
table = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(tableEvent.getCtlName())
.getDbOrAnalysisException(tableEvent.getDbName())
.getTableOrAnalysisException(tableEvent.getTableName());
} catch (AnalysisException e) {
throw new EventException(e);
}
Set<BaseTableInfo> mtmvs = relationManager.getMtmvsByBaseTableOneLevel(
new BaseTableInfo(table));
for (BaseTableInfo baseTableInfo : mtmvs) {
try {
// check if mtmv should trigger by event
MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo);
if (shouldRefreshOnBaseTableDataChange(mtmv, table)) {
jobManager.onCommit(mtmv);
}
} catch (Exception e) {
throw new EventException(e);
}
}
}
private boolean shouldRefreshOnBaseTableDataChange(MTMV mtmv, TableIf table) {
TableName tableName = null;
try {
tableName = new TableName(table);
} catch (AnalysisException e) {
LOG.warn("skip refresh mtmv: {}, because get TableName failed: {}",
mtmv.getName(), table.getName());
return false;
}
if (MTMVPartitionUtil.isTableExcluded(mtmv.getExcludedTriggerTables(), tableName)) {
LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}",
mtmv.getName(), table.getName());
return false;
}
return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT);
}
public void createJob(MTMV mtmv, boolean isReplay) {
jobManager.createJob(mtmv, isReplay);
}
public void dropJob(MTMV mtmv, boolean isReplay) {
jobManager.dropJob(mtmv, isReplay);
}
public void alterJob(MTMV mtmv, boolean isReplay) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
LOG.info("alterMTMV, mtmvName: {}", mtmv.getName());
jobManager.alterJob(mtmv, isReplay);
}
public void postCreateMTMV(MTMV mtmv) {
Objects.requireNonNull(mtmv, "mtmv can not be null");
LOG.info("postCreateMTMV, mtmvName: {}", mtmv.getName());
for (MTMVHookService mtmvHookService : hooks.values()) {
mtmvHookService.postCreateMTMV(mtmv);
}
}
}