MaterializedViewHandler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.AddRollupClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropRollupClause;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaContext;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.IdGeneratorUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.AlterCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelAlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/*
* MaterializedViewHandler is responsible for ADD/DROP materialized view.
* For compatible with older version, it is also responsible for ADD/DROP rollup.
* In function level, the mv completely covers the rollup in the future.
* In grammar level, there is some difference between mv and rollup.
*/
public class MaterializedViewHandler extends AlterHandler {
private static final Logger LOG = LogManager.getLogger(MaterializedViewHandler.class);
public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_";
public static int scheduler_interval_millisecond = 333;
public MaterializedViewHandler() {
super("materialized view", scheduler_interval_millisecond);
}
// for batch submit rollup job, tableId -> jobId
// keep table's not final state job size. The job size determine's table's state, = 0 means table is normal,
// otherwise is rollup
private Map<Long, Set<Long>> tableNotFinalStateJobMap = new ConcurrentHashMap<>();
// keep table's running job,used for concurrency limit
// table id -> set of running job ids
private Map<Long, Set<Long>> tableRunningJobMap = new ConcurrentHashMap<>();
@Override
public void addAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
super.addAlterJobV2(alterJob);
addAlterJobV2ToTableNotFinalStateJobMap(alterJob);
}
protected void batchAddAlterJobV2(List<AlterJobV2> alterJobV2List) throws AnalysisException {
for (AlterJobV2 alterJobV2 : alterJobV2List) {
addAlterJobV2(alterJobV2);
}
}
// return true iff job is actually added this time
private boolean addAlterJobV2ToTableNotFinalStateJobMap(AlterJobV2 alterJobV2) {
if (alterJobV2.isDone()) {
LOG.warn("try to add a final job({}) to a unfinal set. db: {}, tbl: {}",
alterJobV2.getJobId(), alterJobV2.getDbId(), alterJobV2.getTableId());
return false;
}
Long tableId = alterJobV2.getTableId();
Long jobId = alterJobV2.getJobId();
synchronized (tableNotFinalStateJobMap) {
Set<Long> tableNotFinalStateJobIdSet = tableNotFinalStateJobMap.get(tableId);
if (tableNotFinalStateJobIdSet == null) {
tableNotFinalStateJobIdSet = new HashSet<>();
tableNotFinalStateJobMap.put(tableId, tableNotFinalStateJobIdSet);
}
return tableNotFinalStateJobIdSet.add(jobId);
}
}
/**
*
* @param alterJobV2
* @return true iif we really removed a job from tableNotFinalStateJobMap,
* and there is no running job of this table
* false otherwise.
*/
private boolean removeAlterJobV2FromTableNotFinalStateJobMap(AlterJobV2 alterJobV2) {
Long tableId = alterJobV2.getTableId();
Long jobId = alterJobV2.getJobId();
synchronized (tableNotFinalStateJobMap) {
Set<Long> tableNotFinalStateJobIdset = tableNotFinalStateJobMap.get(tableId);
if (tableNotFinalStateJobIdset == null) {
// This could happen when this job is already removed before.
// return false, so that we will not set table's to NORMAL again.
return false;
}
tableNotFinalStateJobIdset.remove(jobId);
if (tableNotFinalStateJobIdset.size() == 0) {
tableNotFinalStateJobMap.remove(tableId);
return true;
}
return false;
}
}
/**
* There are 2 main steps in this function.
* Step1: validate the request.
* Step1.1: semantic analysis: the name of olapTable must be same as the base table name in addMVClause.
* Step1.2: base table validation: the status of base table and partition could be NORMAL.
* Step1.3: materialized view validation: the name and columns of mv is checked.
* Step2: create mv job
* @param addMVClause
* @param db
* @param olapTable
* @throws DdlException
*/
public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
}
// Step1.1: semantic analysis
// TODO(ML): support the materialized view as base index
if (!addMVClause.getBaseIndexName().equals(olapTable.getName())) {
throw new DdlException("The name of table in from clause must be same as the name of alter table");
}
// Step1.2: base table validation
String baseIndexName = addMVClause.getBaseIndexName();
String mvIndexName = addMVClause.getMVName();
LOG.info("process add materialized view[{}] based on [{}]", mvIndexName, baseIndexName);
// avoid conflict against with batch add rollup job
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL);
long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
// Step1.3: mv clause validation
List<Column> mvColumns = checkAndPrepareMaterializedView(addMVClause, olapTable);
// Step2: create mv job
RollupJobV2 rollupJobV2 =
createMaterializedViewJob(addMVClause.toSql(), mvIndexName, baseIndexName, mvColumns,
addMVClause.getWhereClauseItemExpr(olapTable),
addMVClause.getProperties(), olapTable, db, baseIndexId,
addMVClause.getMVKeysType(), addMVClause.getOrigStmt());
addAlterJobV2(rollupJobV2);
olapTable.setState(OlapTableState.ROLLUP);
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
olapTable.writeUnlock();
}
}
/**
* There are 2 main steps in this function.
* Step1: validate the request.
* Step1.1: semantic analysis: the name of olapTable must be same as the base table name in createMvCommand.
* Step1.2: base table validation: the status of base table and partition could be NORMAL.
* Step1.3: materialized view validation: the name and columns of mv is checked.
* Step2: create mv job
* @param createMvCommand
* @param db
* @param olapTable
* @throws DdlException
*/
public void processCreateMaterializedView(CreateMaterializedViewCommand createMvCommand, Database db,
OlapTable olapTable) throws DdlException, AnalysisException {
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
}
// Step1.1: semantic analysis
// TODO(ML): support the materialized view as base index
if (!createMvCommand.getBaseIndexName().equals(olapTable.getName())) {
throw new DdlException("The name of table in from clause must be same as the name of alter table");
}
// Step1.2: base table validation
String baseIndexName = createMvCommand.getBaseIndexName();
String mvIndexName = createMvCommand.getMVName();
LOG.info("process add materialized view[{}] based on [{}]", mvIndexName, baseIndexName);
// avoid conflict against with batch add rollup job
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL);
long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
// Step1.3: mv clause validation
List<Column> mvColumns = checkAndPrepareMaterializedView(createMvCommand, olapTable);
// Step2: create mv job
RollupJobV2 rollupJobV2 =
createMaterializedViewJob(null, mvIndexName, baseIndexName, mvColumns,
createMvCommand.getWhereClauseItemColumn(olapTable),
createMvCommand.getProperties(), olapTable, db, baseIndexId,
createMvCommand.getMVKeysType(), createMvCommand.getOriginStatement());
addAlterJobV2(rollupJobV2);
olapTable.setState(OlapTableState.ROLLUP);
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
olapTable.writeUnlock();
}
}
/**
* There are 2 main steps.
* Step1: validate the request
* Step1.1: base table validation: the status of base table and partition could be NORMAL.
* Step1.2: rollup validation: the name and columns of rollup is checked.
* Step2: create rollup job
* @param alterClauses
* @param db
* @param olapTable
* @throws DdlException
* @throws AnalysisException
*/
public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
// save job id for log
Set<Long> logJobIdSet = new HashSet<>();
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
}
// 1 check and make rollup job
for (AlterClause alterClause : alterClauses) {
AddRollupClause addRollupClause = (AddRollupClause) alterClause;
// step 1 check whether current alter is change storage format
String rollupIndexName = addRollupClause.getRollupName();
boolean changeStorageFormat = false;
if (rollupIndexName.equalsIgnoreCase(olapTable.getName())) {
// for upgrade test to create segment v2 rollup index by using the sql:
// alter table table_name add rollup table_name (columns) properties ("storage_format" = "v2");
Map<String, String> properties = addRollupClause.getProperties();
if (properties == null || !properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT)
|| !properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).equalsIgnoreCase("v2")) {
throw new DdlException("Table[" + olapTable.getName() + "] can not "
+ "add segment v2 rollup index without setting storage format to v2.");
}
rollupIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
changeStorageFormat = true;
}
// get base index schema
String baseIndexName = addRollupClause.getBaseRollupName();
if (baseIndexName == null) {
// use table name as base table name
baseIndexName = olapTable.getName();
}
// step 2 alter clause validation
// step 2.1 check whether base index already exists in catalog
long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
// step 2.2 check rollup schema
List<Column> rollupSchema = checkAndPrepareMaterializedView(
addRollupClause, olapTable, baseIndexId, changeStorageFormat);
// step 3 create rollup job
RollupJobV2 alterJobV2 =
createMaterializedViewJob(rawSql, rollupIndexName, baseIndexName, rollupSchema, null,
addRollupClause.getProperties(), olapTable, db, baseIndexId, olapTable.getKeysType(),
null);
rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2);
logJobIdSet.add(alterJobV2.getJobId());
}
// set table' state to ROLLUP before adding rollup jobs.
// so that when the AlterHandler thread run the jobs, it will see the expected table's state.
// ATTN: This order is not mandatory, because database lock will protect us,
// but this order is more reasonable
olapTable.setState(OlapTableState.ROLLUP);
// 2 batch submit rollup job
List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values());
batchAddAlterJobV2(rollupJobV2List);
BatchAlterJobPersistInfo batchAlterJobV2 = new BatchAlterJobPersistInfo(rollupJobV2List);
Env.getCurrentEnv().getEditLog().logBatchAlterJob(batchAlterJobV2);
LOG.info("finished to create materialized view job: {}", logJobIdSet);
} catch (Exception e) {
// remove tablet which has already inserted into TabletInvertedIndex
TabletInvertedIndex tabletInvertedIndex = Env.getCurrentInvertedIndex();
for (RollupJobV2 rollupJobV2 : rollupNameJobMap.values()) {
for (MaterializedIndex index : rollupJobV2.getPartitionIdToRollupIndex().values()) {
for (Tablet tablet : index.getTablets()) {
tabletInvertedIndex.deleteTablet(tablet.getId());
}
}
}
throw e;
} finally {
olapTable.writeUnlock();
}
}
/**
* Step1: All replicas of the materialized view index will be created in meta and added to TabletInvertedIndex
* Step2: Set table's state to ROLLUP.
*
* @param mvName
* @param baseIndexName
* @param mvColumns
* @param properties
* @param olapTable
* @param db
* @param baseIndexId
* @throws DdlException
* @throws AnalysisException
*/
private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, String baseIndexName,
List<Column> mvColumns, Column whereColumn, Map<String, String> properties,
OlapTable olapTable, Database db, long baseIndexId, KeysType mvKeysType,
OriginStatement origStmt) throws DdlException, AnalysisException {
if (mvKeysType == null) {
// assign rollup index's key type, same as base index's
mvKeysType = olapTable.getKeysType();
}
// get rollup schema hash
int mvSchemaHash = Util.generateSchemaHash();
// get short key column count
boolean isKeysRequired = !(mvKeysType == KeysType.DUP_KEYS);
short mvShortKeyColumnCount = Env.calcShortKeyColumnCount(mvColumns, properties, isKeysRequired);
if (mvShortKeyColumnCount <= 0 && olapTable.isDuplicateWithoutKey()) {
throw new DdlException("Not support create duplicate materialized view without order "
+ "by based on a duplicate table without keys");
}
// get timeout
long timeoutMs = PropertyAnalyzer.analyzeTimeout(properties, Config.alter_table_timeout_second) * 1000;
// create rollup job
long dbId = db.getId();
long tableId = olapTable.getId();
int baseSchemaHash = olapTable.getSchemaHashByIndexId(baseIndexId);
Env env = Env.getCurrentEnv();
long bufferSize = IdGeneratorUtil.getBufferSizeForAlterTable(olapTable, Sets.newHashSet(baseIndexId));
IdGeneratorBuffer idGeneratorBuffer = env.getIdGeneratorBuffer(bufferSize);
long jobId = idGeneratorBuffer.getNextId();
long mvIndexId = idGeneratorBuffer.getNextId();
RollupJobV2 mvJob = AlterJobV2Factory.createRollupJobV2(
rawSql, jobId, dbId, tableId, olapTable.getName(), timeoutMs,
baseIndexId, mvIndexId, baseIndexName, mvName,
mvColumns, whereColumn, baseSchemaHash, mvSchemaHash,
mvKeysType, mvShortKeyColumnCount, origStmt);
String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
if (mvName.equals(newStorageFormatIndexName)) {
mvJob.setStorageFormat(TStorageFormat.V2);
} else {
// use base table's storage format as the mv's format
mvJob.setStorageFormat(olapTable.getStorageFormat());
}
/*
* create all rollup indexes. and set state.
* After setting, Tables' state will be ROLLUP
*/
List<Tablet> addedTablets = Lists.newArrayList();
for (Partition partition : olapTable.getPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
// index state is SHADOW
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
for (Tablet baseTablet : baseIndex.getTablets()) {
TabletMeta mvTabletMeta = new TabletMeta(
dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium);
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();
Tablet newTablet = EnvFactory.getInstance().createTablet(mvTabletId);
mvIndex.addTablet(newTablet, mvTabletMeta);
addedTablets.add(newTablet);
mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
List<Replica> baseReplicas = baseTablet.getReplicas();
int healthyReplicaNum = 0;
for (Replica baseReplica : baseReplicas) {
long mvReplicaId = idGeneratorBuffer.getNextId();
long backendId = baseReplica.getBackendIdWithoutException();
if (baseReplica.getState() == ReplicaState.CLONE
|| baseReplica.getState() == ReplicaState.DECOMMISSION
|| baseReplica.getState() == ReplicaState.COMPACTION_TOO_SLOW
|| baseReplica.getLastFailedVersion() > 0) {
LOG.info("base replica {} of tablet {} state is {}, and last failed version is {},"
+ " skip creating rollup replica", baseReplica.getId(), baseTabletId,
baseReplica.getState(), baseReplica.getLastFailedVersion());
continue;
}
Preconditions.checkState(baseReplica.getState() == ReplicaState.NORMAL,
baseReplica.getState());
ReplicaContext context = new ReplicaContext();
context.replicaId = mvReplicaId;
context.backendId = backendId;
context.state = ReplicaState.ALTER;
context.version = Partition.PARTITION_INIT_VERSION;
context.schemaHash = mvSchemaHash;
context.dbId = dbId;
context.tableId = tableId;
context.partitionId = partitionId;
context.indexId = mvIndexId;
context.originReplica = baseReplica;
// replica's init state is ALTER, so that tablet report process will ignore its report
Replica mvReplica = EnvFactory.getInstance().createReplica(context);
newTablet.addReplica(mvReplica);
healthyReplicaNum++;
} // end for baseReplica
if (healthyReplicaNum < replicationNum / 2 + 1) {
/*
* TODO(cmy): This is a bad design.
* Because in the rollup job, we will only send tasks to the rollup replicas that have been created,
* without checking whether the quorum of replica number are satisfied.
* This will cause the job to fail until we find that the quorum of replica number
* is not satisfied until the entire job is done.
* So here we check the replica number strictly and do not allow to submit the job
* if the quorum of replica number is not satisfied.
*/
for (Tablet tablet : addedTablets) {
Env.getCurrentInvertedIndex().deleteTablet(tablet.getId());
}
throw new DdlException("tablet " + baseTabletId + " has few healthy replica: " + healthyReplicaNum);
}
} // end for baseTablets
mvJob.addMVIndex(partitionId, mvIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("create materialized view index {} based on index {} in partition {}",
mvIndexId, baseIndexId, partitionId);
}
} // end for partitions
LOG.info("finished to create materialized view job: {}", mvJob.getJobId());
return mvJob;
}
private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewCommand createMvCommand,
OlapTable olapTable)
throws DdlException {
// check if mv index already exists
if (olapTable.hasMaterializedIndex(createMvCommand.getMVName())) {
throw new DdlException("Materialized view[" + createMvCommand.getMVName() + "] already exists");
}
if (olapTable.getRowStoreCol() != null) {
throw new DdlException("RowStore table can't create materialized view.");
}
// check if mv columns are valid
// a. Aggregate table:
// 1. all slot's aggregationType must same with value mv column
// 2. all slot's isKey must same with mv column
// 3. value column'define expr must be slot (except all slot belong replace family)
// b. Unique table:
// 1. mv must not contain group expr
// 2. all slot's isKey same with mv column
// 3. mv must contain all key column
// c. Duplicate table:
// 1. Columns resolved by semantics are legal
// 2. Key column not allow float/double type.
// update mv columns
List<MVColumnItem> mvColumnItemList = createMvCommand.getMVColumnItemList();
List<Column> newMVColumns = Lists.newArrayList();
if (olapTable.getKeysType().isAggregationFamily()) {
if (olapTable.getKeysType() == KeysType.AGG_KEYS && createMvCommand.getMVKeysType() != KeysType.AGG_KEYS) {
throw new DdlException("The materialized view of aggregation table must has grouping columns");
}
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& createMvCommand.getMVKeysType() == KeysType.AGG_KEYS) {
// check b.1
throw new DdlException("The materialized view of unique table must not has grouping columns");
}
for (MVColumnItem mvColumnItem : mvColumnItemList) {
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
mvColumnItem.setIsKey(false);
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey()) {
mvColumnItem.setIsKey(true);
}
}
if (!mvColumnItem.isKey()) {
mvColumnItem.setAggregationType(AggregateType.REPLACE, true);
}
}
// check a.2 and b.2
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey() != mvColumnItem.isKey()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s isKey must same with all slot, mvItem.isKey="
+ (mvColumnItem.isKey() ? "true" : "false"));
}
}
if (!mvColumnItem.isKey() && olapTable.getKeysType() == KeysType.AGG_KEYS) {
// check a.1
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.getAggregationType() != mvColumnItem.getAggregationType()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s AggregationType must same with all slot");
}
}
// check a.3
if (!mvColumnItem.getAggregationType().isReplaceFamily()
&& !(mvColumnItem.getDefineExpr() instanceof SlotRef)
&& !((mvColumnItem.getDefineExpr() instanceof CastExpr)
&& mvColumnItem.getDefineExpr().getChild(0) instanceof SlotRef)) {
throw new DdlException(
"The mvItem[" + mvColumnItem.getName() + "] require slot because it is value column");
}
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
} else {
for (MVColumnItem mvColumnItem : mvColumnItemList) {
Set<String> names = mvColumnItem.getBaseColumnNames();
if (names == null) {
throw new DdlException("Base columns is null");
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
}
for (Column column : newMVColumns) {
// check c.2
if (column.isKey() && column.getType().isFloatingPointType()) {
throw new DdlException("Do not support float/double type on key column, you can change it to decimal");
}
}
// check b.3
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
Set<String> originColumns = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
for (Column column : newMVColumns) {
originColumns.add(CreateMaterializedViewStmt.mvColumnBreaker(column.getName()));
}
for (Column column : olapTable.getBaseSchema()) {
if (column.isKey() && !originColumns.contains(column.getName())) {
throw new DdlException("The materialized view of uniq table must contain all key columns. column:"
+ column.getName());
}
}
}
if (newMVColumns.size() == olapTable.getBaseSchema().size()
&& createMvCommand.getMVKeysType() == olapTable.getKeysType()) {
boolean allKeysMatch = true;
for (int i = 0; i < newMVColumns.size(); i++) {
if (!CreateMaterializedViewStmt.mvColumnBreaker(newMVColumns.get(i).getName())
.equalsIgnoreCase(olapTable.getBaseSchema().get(i).getName())) {
allKeysMatch = false;
break;
}
}
if (allKeysMatch) {
throw new DdlException("MV same with base table is useless.");
}
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasDeleteSign()) {
newMVColumns.add(new Column(olapTable.getDeleteSignColumn()));
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasSequenceCol()) {
newMVColumns.add(new Column(olapTable.getSequenceCol()));
}
if (olapTable.storeRowColumn()) {
Column newColumn = new Column(olapTable.getRowStoreCol());
newColumn.setAggregationType(AggregateType.NONE, true);
newMVColumns.add(newColumn);
}
// if the column is complex type, we forbid to create materialized view
for (Column column : newMVColumns) {
if (column.getDataType().isComplexType() || column.getDataType().isJsonbType()) {
throw new DdlException("The " + column.getDataType() + " column[" + column + "] not support "
+ "to create materialized view");
}
if (createMvCommand.getMVKeysType() != KeysType.AGG_KEYS
&& (column.getType().isBitmapType() || column.getType().isHllType())) {
throw new DdlException("Bitmap/HLL type only support aggregate table");
}
}
if (olapTable.getEnableLightSchemaChange()) {
int nextColUniqueId = Column.COLUMN_UNIQUE_ID_INIT_VALUE + 1;
for (Column column : newMVColumns) {
column.setUniqueId(nextColUniqueId);
nextColUniqueId++;
}
} else {
newMVColumns.forEach(column -> {
column.setUniqueId(Column.COLUMN_UNIQUE_ID_INIT_VALUE);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("lightSchemaChange:{}, newMVColumns:{}", olapTable.getEnableLightSchemaChange(), newMVColumns);
}
return newMVColumns;
}
private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewStmt addMVClause, OlapTable olapTable)
throws DdlException {
// check if mv index already exists
if (olapTable.hasMaterializedIndex(addMVClause.getMVName())) {
throw new DdlException("Materialized view[" + addMVClause.getMVName() + "] already exists");
}
if (olapTable.getRowStoreCol() != null) {
throw new DdlException("RowStore table can't create materialized view.");
}
// check if mv columns are valid
// a. Aggregate table:
// 1. all slot's aggregationType must same with value mv column
// 2. all slot's isKey must same with mv column
// 3. value column'define expr must be slot (except all slot belong replace family)
// b. Unique table:
// 1. mv must not contain group expr
// 2. all slot's isKey same with mv column
// 3. mv must contain all key column
// c. Duplicate table:
// 1. Columns resolved by semantics are legal
// 2. Key column not allow float/double type.
// update mv columns
List<MVColumnItem> mvColumnItemList = addMVClause.getMVColumnItemList();
List<Column> newMVColumns = Lists.newArrayList();
if (olapTable.getKeysType().isAggregationFamily()) {
if (!addMVClause.isReplay() && olapTable.getKeysType() == KeysType.AGG_KEYS
&& addMVClause.getMVKeysType() != KeysType.AGG_KEYS) {
throw new DdlException("The materialized view of aggregation table must has grouping columns");
}
if (!addMVClause.isReplay() && olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& addMVClause.getMVKeysType() == KeysType.AGG_KEYS) {
// check b.1
throw new DdlException("The materialized view of unique table must not has grouping columns");
}
for (MVColumnItem mvColumnItem : mvColumnItemList) {
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
mvColumnItem.setIsKey(false);
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (!addMVClause.isReplay()
&& olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey()) {
mvColumnItem.setIsKey(true);
}
}
if (!mvColumnItem.isKey()) {
mvColumnItem.setAggregationType(AggregateType.REPLACE, true);
}
}
// check a.2 and b.2
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (!addMVClause.isReplay() && olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey() != mvColumnItem.isKey()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s isKey must same with all slot, mvItem.isKey="
+ (mvColumnItem.isKey() ? "true" : "false"));
}
}
if (!addMVClause.isReplay() && !mvColumnItem.isKey() && olapTable.getKeysType() == KeysType.AGG_KEYS) {
// check a.1
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.getAggregationType() != mvColumnItem.getAggregationType()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s AggregationType must same with all slot");
}
}
// check a.3
if (!mvColumnItem.getAggregationType().isReplaceFamily()
&& !(mvColumnItem.getDefineExpr() instanceof SlotRef)
&& !((mvColumnItem.getDefineExpr() instanceof CastExpr)
&& mvColumnItem.getDefineExpr().getChild(0) instanceof SlotRef)) {
throw new DdlException(
"The mvItem[" + mvColumnItem.getName() + "] require slot because it is value column");
}
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
} else {
for (MVColumnItem mvColumnItem : mvColumnItemList) {
Set<String> names = mvColumnItem.getBaseColumnNames();
if (names == null) {
throw new DdlException("Base columns is null");
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
}
for (Column column : newMVColumns) {
// check c.2
if (column.isKey() && column.getType().isFloatingPointType()) {
throw new DdlException("Do not support float/double type on key column, you can change it to decimal");
}
}
// check b.3
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && !addMVClause.isReplay()) {
Set<String> originColumns = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
for (Column column : newMVColumns) {
originColumns.add(CreateMaterializedViewStmt.mvColumnBreaker(column.getName()));
}
for (Column column : olapTable.getBaseSchema()) {
if (column.isKey() && !originColumns.contains(column.getName())) {
throw new DdlException("The materialized view of uniq table must contain all key columns. column:"
+ column.getName());
}
}
}
if (newMVColumns.size() == olapTable.getBaseSchema().size() && !addMVClause.isReplay()
&& addMVClause.getMVKeysType() == olapTable.getKeysType()) {
boolean allKeysMatch = true;
for (int i = 0; i < newMVColumns.size(); i++) {
if (!CreateMaterializedViewStmt.mvColumnBreaker(newMVColumns.get(i).getName())
.equalsIgnoreCase(olapTable.getBaseSchema().get(i).getName())) {
allKeysMatch = false;
break;
}
}
if (allKeysMatch) {
throw new DdlException("MV same with base table is useless.");
}
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasDeleteSign()) {
newMVColumns.add(new Column(olapTable.getDeleteSignColumn()));
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasSequenceCol()) {
newMVColumns.add(new Column(olapTable.getSequenceCol()));
}
if (olapTable.storeRowColumn()) {
Column newColumn = new Column(olapTable.getRowStoreCol());
newColumn.setAggregationType(AggregateType.NONE, true);
newMVColumns.add(newColumn);
}
// if the column is complex type, we forbid to create materialized view
for (Column column : newMVColumns) {
if (column.getDataType().isComplexType() || column.getDataType().isJsonbType()) {
throw new DdlException("The " + column.getDataType() + " column[" + column + "] not support "
+ "to create materialized view");
}
if (addMVClause.getMVKeysType() != KeysType.AGG_KEYS
&& (column.getType().isBitmapType() || column.getType().isHllType())) {
throw new DdlException("Bitmap/HLL type only support aggregate table");
}
}
if (olapTable.getEnableLightSchemaChange()) {
int nextColUniqueId = Column.COLUMN_UNIQUE_ID_INIT_VALUE + 1;
for (Column column : newMVColumns) {
column.setUniqueId(nextColUniqueId);
nextColUniqueId++;
}
} else {
newMVColumns.forEach(column -> {
column.setUniqueId(Column.COLUMN_UNIQUE_ID_INIT_VALUE);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("lightSchemaChange:{}, newMVColumns:{}", olapTable.getEnableLightSchemaChange(), newMVColumns);
}
return newMVColumns;
}
public List<Column> checkAndPrepareMaterializedView(AddRollupClause addRollupClause, OlapTable olapTable,
long baseIndexId, boolean changeStorageFormat)
throws DdlException {
if (olapTable.getRowStoreCol() != null) {
throw new DdlException("RowStore table can't create materialized view.");
}
String rollupIndexName = addRollupClause.getRollupName();
List<String> rollupColumnNames = addRollupClause.getColumnNames();
if (changeStorageFormat) {
String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
rollupIndexName = newStorageFormatIndexName;
// Must get all columns including invisible columns.
// Because in alter process, all columns must be considered.
List<Column> columns = olapTable.getSchemaByIndexId(baseIndexId, true);
// create the same schema as base table
rollupColumnNames.clear();
for (Column column : columns) {
rollupColumnNames.add(column.getName());
}
}
// 2. check if rollup index already exists
if (olapTable.hasMaterializedIndex(rollupIndexName)) {
throw new DdlException("Rollup index[" + rollupIndexName + "] already exists");
}
// 3. check if rollup columns are valid
// a. all columns should exist in base rollup schema
// b. value after key
// c. if rollup contains REPLACE column, all keys on base index should be included.
// d. if base index has sequence column for unique_keys, rollup should add the sequence column
List<Column> rollupSchema = new ArrayList<Column>();
// check (a)(b)
boolean meetValue = false;
boolean hasKey = false;
boolean meetReplaceValue = false;
KeysType keysType = olapTable.getKeysType();
Map<String, Column> baseColumnNameToColumn = Maps.newHashMap();
for (Column column : olapTable.getSchemaByIndexId(baseIndexId, true)) {
baseColumnNameToColumn.put(column.getName(), column);
}
if (keysType.isAggregationFamily()) {
int keysNumOfRollup = 0;
for (String columnName : rollupColumnNames) {
Column baseColumn = baseColumnNameToColumn.get(columnName);
if (baseColumn == null) {
throw new DdlException("Column[" + columnName + "] does not exist");
}
if (baseColumn.isKey() && meetValue) {
throw new DdlException("Invalid column order. value should be after key");
}
if (baseColumn.isKey()) {
if (baseColumn.getType().isFloatingPointType()) {
throw new DdlException(
"Do not support float/double type on group by, you can change it to decimal");
}
keysNumOfRollup += 1;
hasKey = true;
} else {
meetValue = true;
if (baseColumn.getAggregationType().isReplaceFamily()) {
meetReplaceValue = true;
}
}
Column oneColumn = new Column(baseColumn);
rollupSchema.add(oneColumn);
}
if (!hasKey) {
throw new DdlException("No key column is found");
}
if (KeysType.UNIQUE_KEYS == keysType || meetReplaceValue) {
// rollup of unique key table or rollup with REPLACE value
// should have all keys of base table
if (keysNumOfRollup != olapTable.getKeysNum()) {
if (KeysType.UNIQUE_KEYS == keysType) {
throw new DdlException("Rollup should contains all unique keys in basetable");
} else {
throw new DdlException("Rollup should contains all keys if there is a REPLACE value");
}
}
// add hidden column to rollup table
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasDeleteSign()) {
rollupSchema.add(new Column(olapTable.getDeleteSignColumn()));
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasSequenceCol()) {
rollupSchema.add(new Column(olapTable.getSequenceCol()));
}
}
// check useless rollup of same key columns and same order with base table
if (keysNumOfRollup == olapTable.getKeysNum()) {
boolean allKeysMatch = true;
for (int i = 0; i < keysNumOfRollup; i++) {
if (!rollupSchema.get(i).getName()
.equalsIgnoreCase(olapTable.getSchemaByIndexId(baseIndexId, true).get(i).getName())) {
allKeysMatch = false;
break;
}
}
if (allKeysMatch) {
throw new DdlException("Rollup contains all keys in base table with same order for "
+ "aggregation or unique table is useless.");
}
}
} else if (KeysType.DUP_KEYS == keysType) {
// supplement the duplicate key
if (addRollupClause.getDupKeys() == null || addRollupClause.getDupKeys().isEmpty()) {
// check the column meta
boolean allColumnsMatch = true;
for (int i = 0; i < rollupColumnNames.size(); i++) {
String columnName = rollupColumnNames.get(i);
if (!columnName.equalsIgnoreCase(olapTable.getSchemaByIndexId(baseIndexId, true).get(i).getName())
&& olapTable.getSchemaByIndexId(baseIndexId, true).get(i).isKey()) {
allColumnsMatch = false;
}
Column baseColumn = baseColumnNameToColumn.get(columnName);
if (baseColumn == null) {
throw new DdlException("Column[" + columnName + "] does not exist in base index");
}
Column rollupColumn = new Column(baseColumn);
rollupSchema.add(rollupColumn);
}
if (changeStorageFormat) {
return rollupSchema;
}
// Supplement key of MV columns
int theBeginIndexOfValue = 0;
int keySizeByte = 0;
for (; theBeginIndexOfValue < rollupSchema.size(); theBeginIndexOfValue++) {
Column column = rollupSchema.get(theBeginIndexOfValue);
keySizeByte += column.getType().getIndexSize();
if (theBeginIndexOfValue + 1 > FeConstants.shortkey_max_column_count
|| keySizeByte > FeConstants.shortkey_maxsize_bytes) {
if (theBeginIndexOfValue != 0 || !column.getType().getPrimitiveType().isCharFamily()) {
break;
}
}
if (!column.getType().couldBeShortKey()) {
break;
}
column.setIsKey(true);
if (column.getType().getPrimitiveType() == PrimitiveType.VARCHAR) {
theBeginIndexOfValue++;
break;
}
}
if (theBeginIndexOfValue == 0) {
throw new DdlException("The first column could not be float or double");
}
// Supplement value of MV columns
for (; theBeginIndexOfValue < rollupSchema.size(); theBeginIndexOfValue++) {
Column rollupColumn = rollupSchema.get(theBeginIndexOfValue);
rollupColumn.setIsKey(false);
rollupColumn.setAggregationType(AggregateType.NONE, true);
}
if (allColumnsMatch) {
throw new DdlException("Rollup contain the columns of the base table in prefix order for "
+ "duplicate table is useless.");
}
} else {
/*
* eg.
* Base Table's schema is (k1,k2,k3,k4,k5) dup key (k1,k2,k3).
* The following rollup is allowed:
* 1. (k1) dup key (k1)
*
* The following rollup is forbidden:
* 1. (k1) dup key (k2)
* 2. (k2,k3) dup key (k3,k2)
* 3. (k1,k2,k3) dup key (k2,k3)
*
* The following rollup is useless so forbidden too:
* 1. (k1,k2,k3) dup key (k1,k2,k3)
* 3. (k1,k2,k3) dup key (k1,k2)
* 1. (k1) dup key (k1)
*/
// user specify the duplicate keys for rollup index
List<String> dupKeys = addRollupClause.getDupKeys();
if (dupKeys.size() > rollupColumnNames.size()) {
throw new DdlException("Num of duplicate keys should less than or equal to num of rollup columns.");
}
boolean allColumnsMatch = true;
for (int i = 0; i < rollupColumnNames.size(); i++) {
String rollupColName = rollupColumnNames.get(i);
boolean isKey = false;
if (i < dupKeys.size()) {
String dupKeyName = dupKeys.get(i);
if (!rollupColName.equalsIgnoreCase(dupKeyName)) {
throw new DdlException("Duplicate keys should be the prefix of rollup columns");
}
isKey = true;
}
if (!rollupColName.equalsIgnoreCase(
olapTable.getSchemaByIndexId(baseIndexId, true).get(i).getName())
&& olapTable.getSchemaByIndexId(baseIndexId, true).get(i).isKey()) {
allColumnsMatch = false;
}
Column baseColumn = baseColumnNameToColumn.get(rollupColName);
if (baseColumn == null) {
throw new DdlException("Column[" + rollupColName + "] does not exist");
}
if (isKey && meetValue) {
throw new DdlException("Invalid column order. key should before all values: " + rollupColName);
}
Column oneColumn = new Column(baseColumn);
if (isKey) {
hasKey = true;
oneColumn.setIsKey(true);
oneColumn.setAggregationType(null, false);
} else {
meetValue = true;
oneColumn.setIsKey(false);
oneColumn.setAggregationType(AggregateType.NONE, true);
}
rollupSchema.add(oneColumn);
}
if (allColumnsMatch) {
throw new DdlException("Rollup contain the columns of the base table in prefix order for "
+ "duplicate table is useless.");
}
}
}
if (olapTable.getEnableLightSchemaChange()) {
int nextColUniqueId = Column.COLUMN_UNIQUE_ID_INIT_VALUE + 1;
for (Column column : rollupSchema) {
column.setUniqueId(nextColUniqueId);
nextColUniqueId++;
}
} else {
rollupSchema.forEach(column -> {
column.setUniqueId(Column.COLUMN_UNIQUE_ID_INIT_VALUE);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("lightSchemaChange:{}, rollupSchema:{}, baseSchema:{}",
olapTable.getEnableLightSchemaChange(),
rollupSchema, olapTable.getSchemaByIndexId(baseIndexId, true));
}
return rollupSchema;
}
/**
*
* @param baseIndexName
* @param olapTable
* @return
* @throws DdlException
*/
private long checkAndGetBaseIndex(String baseIndexName, OlapTable olapTable) throws DdlException {
// up to here, table's state can only be NORMAL
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name());
Long baseIndexId = olapTable.getIndexIdByName(baseIndexName);
if (baseIndexId == null) {
throw new DdlException("Base index[" + baseIndexName + "] does not exist");
}
// check state
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
// up to here. index's state should only be NORMAL
Preconditions.checkState(baseIndex.getState() == IndexState.NORMAL, baseIndex.getState().name());
}
return baseIndexId;
}
public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database db, OlapTable olapTable)
throws DdlException, MetaNotFoundException {
List<Long> deleteIndexList = null;
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
}
// check drop rollup index operation
for (AlterClause alterClause : dropRollupClauses) {
DropRollupClause dropRollupClause = (DropRollupClause) alterClause;
checkDropMaterializedView(dropRollupClause.getRollupName(), olapTable);
}
// drop data in memory
Map<Long, String> rollupNameMap = new HashMap<>();
for (AlterClause alterClause : dropRollupClauses) {
DropRollupClause dropRollupClause = (DropRollupClause) alterClause;
String rollupIndexName = dropRollupClause.getRollupName();
long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable);
rollupNameMap.put(rollupIndexId, rollupIndexName);
}
// batch log drop rollup operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
long dbId = db.getId();
long tableId = olapTable.getId();
String tableName = olapTable.getName();
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, rollupNameMap));
deleteIndexList = rollupNameMap.keySet().stream().collect(Collectors.toList());
LOG.info("finished drop rollup index[{}] in table[{}]",
String.join("", rollupNameMap.values()), olapTable.getName());
} finally {
olapTable.writeUnlock();
}
Env.getCurrentInternalCatalog().eraseDroppedIndex(olapTable.getId(), deleteIndexList);
}
public void processDropMaterializedView(DropMaterializedViewStmt dropMaterializedViewStmt, Database db,
OlapTable olapTable) throws DdlException, MetaNotFoundException {
List<Long> deleteIndexList = new ArrayList<Long>();
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
String mvName = dropMaterializedViewStmt.getMvName();
// Step1: check drop mv index operation
checkDropMaterializedView(mvName, olapTable);
// Step2; drop data in memory
long mvIndexId = dropMaterializedView(mvName, olapTable);
// Step3: log drop mv operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(),
mvIndexId, mvName, false, false, 0));
deleteIndexList.add(mvIndexId);
LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName());
} catch (MetaNotFoundException e) {
if (dropMaterializedViewStmt.isIfExists()) {
LOG.info(e.getMessage());
} else {
throw e;
}
} finally {
olapTable.writeUnlock();
}
Env.getCurrentInternalCatalog().eraseDroppedIndex(olapTable.getId(), deleteIndexList);
}
/**
* Make sure we got db write lock before using this method.
* Up to here, table's state can only be NORMAL.
*
* @param mvName
* @param olapTable
*/
private void checkDropMaterializedView(String mvName, OlapTable olapTable)
throws DdlException, MetaNotFoundException {
if (mvName.equals(olapTable.getName())) {
throw new DdlException("Cannot drop base index by using DROP ROLLUP or DROP MATERIALIZED VIEW.");
}
if (!olapTable.hasMaterializedIndex(mvName)) {
throw new MetaNotFoundException(
"Materialized view [" + mvName + "] does not exist in table [" + olapTable.getName() + "]");
}
long mvIndexId = olapTable.getIndexIdByName(mvName);
int mvSchemaHash = olapTable.getSchemaHashByIndexId(mvIndexId);
Preconditions.checkState(mvSchemaHash != -1);
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex materializedIndex = partition.getIndex(mvIndexId);
Preconditions.checkNotNull(materializedIndex);
}
}
/**
* Return mv index id which has been dropped
*
* @param mvName
* @param olapTable
* @return
*/
private long dropMaterializedView(String mvName, OlapTable olapTable) {
long mvIndexId = olapTable.getIndexIdByName(mvName);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.getIndex(mvIndexId);
// delete rollup index
partition.deleteRollupIndex(mvIndexId);
// remove tablets from inverted index
for (Tablet tablet : rollupIndex.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
}
}
olapTable.deleteIndexInfo(mvName);
try {
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(),
Env.getCurrentInternalCatalog().getDbOrDdlException(olapTable.getQualifiedDbName()).getId(),
olapTable.getId(), mvIndexId);
} catch (DdlException e) {
LOG.info("failed to clean stats for mv {} from {}", mvName, olapTable.getName(), e);
}
return mvIndexId;
}
public void replayDropRollup(DropInfo dropInfo, Env env) throws MetaNotFoundException {
long dbId = dropInfo.getDbId();
long tableId = dropInfo.getTableId();
long rollupIndexId = dropInfo.getIndexId();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Database db = env.getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLock();
try {
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.deleteRollupIndex(rollupIndexId);
// remove from inverted index
for (Tablet tablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);
olapTable.deleteIndexInfo(rollupIndexName);
env.getQueryStats().clear(env.getCurrentCatalog().getId(), db.getId(),
olapTable.getId(), rollupIndexId);
} finally {
olapTable.writeUnlock();
}
List<Long> deleteIndexList = new ArrayList<Long>();
deleteIndexList.add(rollupIndexId);
Env.getCurrentInternalCatalog().eraseDroppedIndex(olapTable.getId(), deleteIndexList);
LOG.info("replay drop rollup {}", dropInfo.getIndexId());
}
@Override
protected void runAfterCatalogReady() {
super.runAfterCatalogReady();
runAlterJobV2();
}
private Map<Long, AlterJobV2> getAlterJobsCopy() {
return new HashMap<>(alterJobsV2);
}
private void removeJobFromRunningQueue(AlterJobV2 alterJob) {
synchronized (tableRunningJobMap) {
Set<Long> runningJobIdSet = tableRunningJobMap.get(alterJob.getTableId());
if (runningJobIdSet != null) {
runningJobIdSet.remove(alterJob.getJobId());
if (runningJobIdSet.size() == 0) {
tableRunningJobMap.remove(alterJob.getTableId());
}
}
}
}
private void changeTableStatus(long dbId, long tableId, OlapTableState olapTableState) {
try {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLockOrMetaException();
try {
if (olapTable.getState() == olapTableState) {
return;
}
olapTable.setState(olapTableState);
} finally {
olapTable.writeUnlock();
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] changing table status failed after rollup job done", e);
}
}
// replay the alter job v2
@Override
public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
super.replayAlterJobV2(alterJob);
if (!alterJob.isDone()) {
addAlterJobV2ToTableNotFinalStateJobMap(alterJob);
changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.ROLLUP);
LOG.info("set table's state to ROLLUP, table id: {}, job id: {}", alterJob.getTableId(),
alterJob.getJobId());
} else if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(),
alterJob.getJobId());
} else {
LOG.info("not set table's state, table id: {}, is job done: {}, job id: {}", alterJob.getTableId(),
alterJob.isDone(), alterJob.getJobId());
}
}
/**
* create tablet and alter tablet in be is thread safe,so we can run rollup job for one table concurrently
*/
private void runAlterJobWithConcurrencyLimit(RollupJobV2 rollupJobV2) {
if (rollupJobV2.isDone()) {
return;
}
if (rollupJobV2.isTimeout()) {
// in run(), the timeout job will be cancelled.
rollupJobV2.run();
return;
}
// check if rollup job can be run within limitation.
long tblId = rollupJobV2.getTableId();
long jobId = rollupJobV2.getJobId();
boolean shouldJobRun = false;
synchronized (tableRunningJobMap) {
Set<Long> tableRunningJobSet = tableRunningJobMap.get(tblId);
if (tableRunningJobSet == null) {
tableRunningJobSet = new HashSet<>();
tableRunningJobMap.put(tblId, tableRunningJobSet);
}
// current job is already in running
if (tableRunningJobSet.contains(jobId)) {
shouldJobRun = true;
} else if (tableRunningJobSet.size() < Config.max_running_rollup_job_num_per_table) {
// add current job to running queue
tableRunningJobSet.add(jobId);
shouldJobRun = true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("number of running alter job {} in table {} exceed limit {}. job {} is suspended",
tableRunningJobSet.size(), rollupJobV2.getTableId(),
Config.max_running_rollup_job_num_per_table, rollupJobV2.getJobId());
}
shouldJobRun = false;
}
}
if (shouldJobRun) {
rollupJobV2.run();
}
}
private void runAlterJobV2() {
for (Map.Entry<Long, AlterJobV2> entry : getAlterJobsCopy().entrySet()) {
RollupJobV2 alterJob = (RollupJobV2) entry.getValue();
// run alter job
runAlterJobWithConcurrencyLimit(alterJob);
// the following check should be right after job's running, so that the table's state
// can be changed to NORMAL immediately after the last alter job of the table is done.
//
// ATTN(cmy): there is still a short gap between "job finish" and "table become normal",
// so if user send next alter job right after the "job finish",
// it may encounter "table's state not NORMAL" error.
if (alterJob.isDone()) {
onJobDone(alterJob);
}
}
}
// remove job from running queue and state map, also set table's state to NORMAL if this is
// the last running job of the table.
private void onJobDone(AlterJobV2 alterJob) {
removeJobFromRunningQueue(alterJob);
if (removeAlterJobV2FromTableNotFinalStateJobMap(alterJob)) {
Env.getCurrentEnv().getGroupCommitManager().unblockTable(alterJob.getTableId());
changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(),
alterJob.getJobId());
}
}
@Override
public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
List<List<Comparable>> rollupJobInfos = new LinkedList<List<Comparable>>();
getAlterJobV2Infos(db, rollupJobInfos);
// sort by
// "JobId", "TableName", "CreateTime", "FinishedTime", "BaseIndexName", "RollupIndexName"
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1, 2, 3, 4, 5);
Collections.sort(rollupJobInfos, comparator);
return rollupJobInfos;
}
public List<List<Comparable>> getAllAlterJobInfos() {
List<List<Comparable>> rollupJobInfos = new LinkedList<List<Comparable>>();
for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) {
// no need to check priv here. This method is only called in show proc stmt,
// which already check the ADMIN priv.
alterJob.getInfo(rollupJobInfos);
}
return rollupJobInfos;
}
private void getAlterJobV2Infos(Database db, List<List<Comparable>> rollupJobInfos) {
ConnectContext ctx = ConnectContext.get();
for (AlterJobV2 alterJob : alterJobsV2.values()) {
if (alterJob.getDbId() != db.getId()) {
continue;
}
if (ctx != null) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ctx, db.getCatalog().getName(), db.getFullName(),
alterJob.getTableName(), PrivPredicate.ALTER)) {
continue;
}
}
alterJob.getInfo(rollupJobInfos);
}
}
@Override
public void process(String rawSql, List<AlterClause> alterClauses, Database db,
OlapTable olapTable)
throws DdlException, AnalysisException, MetaNotFoundException {
if (olapTable.isDuplicateWithoutKey()) {
throw new DdlException("Duplicate table without keys do not support alter rollup!");
}
Optional<AlterClause> alterClauseOptional = alterClauses.stream().findAny();
if (alterClauseOptional.isPresent()) {
if (alterClauseOptional.get() instanceof AddRollupClause) {
processBatchAddRollup(rawSql, alterClauses, db, olapTable);
} else if (alterClauseOptional.get() instanceof DropRollupClause) {
processBatchDropRollup(alterClauses, db, olapTable);
} else {
Preconditions.checkState(false);
}
}
}
public void cancel(CancelAlterTableCommand command) throws DdlException {
String dbName = command.getDbName();
String tableName = command.getTableName();
Preconditions.checkState(!Strings.isNullOrEmpty(dbName));
Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
List<AlterJobV2> rollupJobV2List = new ArrayList<>();
OlapTable olapTable;
try {
olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new DdlException(e.getMessage());
}
olapTable.writeLock();
try {
if (olapTable.getState() != OlapTableState.ROLLUP
&& olapTable.getState() != OlapTableState.WAITING_STABLE) {
throw new DdlException("Table[" + tableName + "] is not under ROLLUP. "
+ "Use 'ALTER TABLE DROP ROLLUP' if you want to.");
}
// find from new alter jobs first
if (command.getAlterJobIdList() != null) {
for (Long jobId : command.getAlterJobIdList()) {
AlterJobV2 alterJobV2 = getUnfinishedAlterJobV2ByJobId(jobId);
if (alterJobV2 == null) {
continue;
}
rollupJobV2List.add(getUnfinishedAlterJobV2ByJobId(jobId));
}
} else {
rollupJobV2List = getUnfinishedAlterJobV2ByTableId(olapTable.getId());
}
if (rollupJobV2List.size() == 0) {
// Alter job v1 is not supported, delete related code
throw new DdlException("Table[" + tableName + "] is not under ROLLUP. Maybe it has old alter job");
}
} finally {
olapTable.writeUnlock();
}
// alter job v2's cancel must be called outside the table lock
if (rollupJobV2List.size() != 0) {
for (AlterJobV2 alterJobV2 : rollupJobV2List) {
alterJobV2.cancel("user cancelled");
if (alterJobV2.isDone()) {
onJobDone(alterJobV2);
}
}
}
}
@Override
public void processForNereids(String rawSql, List<AlterCommand> alterCommands, Database db,
OlapTable olapTable)
throws DdlException, AnalysisException, MetaNotFoundException {
// TODO: convert alterClauses to alterSystemCommands for mv
}
@Override
public void cancel(CancelStmt stmt) throws DdlException {
CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt) stmt;
String dbName = cancelAlterTableStmt.getDbName();
String tableName = cancelAlterTableStmt.getTableName();
Preconditions.checkState(!Strings.isNullOrEmpty(dbName));
Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
List<AlterJobV2> rollupJobV2List = new ArrayList<>();
OlapTable olapTable;
try {
olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new DdlException(e.getMessage());
}
olapTable.writeLock();
try {
if (olapTable.getState() != OlapTableState.ROLLUP
&& olapTable.getState() != OlapTableState.WAITING_STABLE) {
throw new DdlException("Table[" + tableName + "] is not under ROLLUP. "
+ "Use 'ALTER TABLE DROP ROLLUP' if you want to.");
}
// find from new alter jobs first
if (cancelAlterTableStmt.getAlterJobIdList() != null) {
for (Long jobId : cancelAlterTableStmt.getAlterJobIdList()) {
AlterJobV2 alterJobV2 = getUnfinishedAlterJobV2ByJobId(jobId);
if (alterJobV2 == null) {
continue;
}
rollupJobV2List.add(getUnfinishedAlterJobV2ByJobId(jobId));
}
} else {
rollupJobV2List = getUnfinishedAlterJobV2ByTableId(olapTable.getId());
}
if (rollupJobV2List.size() == 0) {
// Alter job v1 is not supported, delete related code
throw new DdlException("Table[" + tableName + "] is not under ROLLUP. Maybe it has old alter job");
}
} finally {
olapTable.writeUnlock();
}
// alter job v2's cancel must be called outside the table lock
if (rollupJobV2List.size() != 0) {
for (AlterJobV2 alterJobV2 : rollupJobV2List) {
alterJobV2.cancel("user cancelled");
if (alterJobV2.isDone()) {
onJobDone(alterJobV2);
}
}
}
}
// just for ut
public Map<Long, Set<Long>> getTableRunningJobMap() {
return tableRunningJobMap;
}
}