Alter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterMultiPartitionClause;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropPartitionFromIndexClause;
import org.apache.doris.analysis.ModifyColumnCommentClause;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.analysis.ModifyEngineClause;
import org.apache.doris.analysis.ModifyPartitionClause;
import org.apache.doris.analysis.ModifyTableCommentClause;
import org.apache.doris.analysis.ModifyTablePropertiesClause;
import org.apache.doris.analysis.PartitionRenameClause;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.ReplaceTableClause;
import org.apache.doris.analysis.RollupRenameClause;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRenameClause;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.cloud.alter.CloudSchemaChangeHandler;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.PropertyAnalyzer.RewriteProperty;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.trees.plans.commands.AlterSystemCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.ModifyTableEngineOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TTabletType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class Alter {
private static final Logger LOG = LogManager.getLogger(Alter.class);
private AlterHandler schemaChangeHandler;
private AlterHandler materializedViewHandler;
private SystemHandler systemHandler;
public Alter() {
schemaChangeHandler = Config.isCloudMode() ? new CloudSchemaChangeHandler() : new SchemaChangeHandler();
materializedViewHandler = new MaterializedViewHandler();
systemHandler = new SystemHandler();
}
public void start() {
schemaChangeHandler.start();
materializedViewHandler.start();
systemHandler.start();
}
public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
throws DdlException, AnalysisException, MetaNotFoundException {
String tableName = stmt.getBaseIndexName();
// check db
String dbName = stmt.getDBName();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
Env.getCurrentInternalCatalog().checkAvailableCapacity(db);
OlapTable olapTable = (OlapTable) db.getNonTempTableOrMetaException(tableName, TableType.OLAP);
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable);
}
public void processCreateMaterializedView(CreateMaterializedViewCommand command)
throws DdlException, AnalysisException, MetaNotFoundException {
String tableName = command.getBaseIndexName();
// check db
String dbName = command.getDBName();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
Env.getCurrentInternalCatalog().checkAvailableCapacity(db);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP);
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(command, db, olapTable);
}
public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
TableName tableName = stmt.getTableName();
String dbName = tableName.getDb();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
String name = tableName.getTbl();
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(name, TableType.OLAP);
((MaterializedViewHandler) materializedViewHandler).processDropMaterializedView(stmt, db, olapTable);
}
private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, List<AlterClause> alterClauses,
Database db) throws UserException {
if (olapTable.getDataSortInfo() != null
&& olapTable.getDataSortInfo().getSortType() == TSortType.ZORDER) {
throw new UserException("z-order table can not support schema change!");
}
stmt.rewriteAlterClause(olapTable);
alterClauses.addAll(stmt.getOps());
return processAlterOlapTableInternal(alterClauses, olapTable, db, stmt.toSql());
}
private boolean processAlterOlapTable(AlterTableCommand command, OlapTable olapTable,
List<AlterClause> alterClauses,
Database db) throws UserException {
if (olapTable.getDataSortInfo() != null
&& olapTable.getDataSortInfo().getSortType() == TSortType.ZORDER) {
throw new UserException("z-order table can not support schema change!");
}
// check conflict alter ops first
alterClauses.addAll(command.getOps());
return processAlterOlapTableInternal(alterClauses, olapTable, db, command.toSql());
}
private boolean processAlterOlapTableInternal(List<AlterClause> alterClauses, OlapTable olapTable,
Database db, String sql) throws UserException {
if (olapTable.getDataSortInfo() != null
&& olapTable.getDataSortInfo().getSortType() == TSortType.ZORDER) {
throw new UserException("z-order table can not support schema change!");
}
// check conflict alter ops first
AlterOperations currentAlterOps = new AlterOperations();
currentAlterOps.checkConflict(alterClauses);
for (AlterClause clause : alterClauses) {
Map<String, String> properties = null;
try {
properties = clause.getProperties();
} catch (Exception e) {
continue;
}
if (properties != null && !properties.isEmpty()) {
checkNoForceProperty(properties);
}
}
if (olapTable instanceof MTMV) {
currentAlterOps.checkMTMVAllow(alterClauses);
}
// check cluster capacity and db quota, only need to check once.
if (currentAlterOps.needCheckCapacity()) {
Env.getCurrentInternalCatalog().checkAvailableCapacity(db);
}
olapTable.checkNormalStateForAlter();
boolean needProcessOutsideTableLock = false;
BaseTableInfo oldBaseTableInfo = new BaseTableInfo(olapTable);
Optional<BaseTableInfo> newBaseTableInfo = Optional.empty();
if (currentAlterOps.checkTableStoragePolicy(alterClauses)) {
String tableStoragePolicy = olapTable.getStoragePolicy();
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
// If the two policy has one same resource, then it's safe for the table to change policy
// There would only be the cooldown ttl or cooldown time would be affected
if (!Env.getCurrentEnv().getPolicyMgr()
.checkStoragePolicyIfSameResource(tableStoragePolicy, currentStoragePolicy)
&& !tableStoragePolicy.isEmpty()) {
for (Partition partition : olapTable.getAllPartitions()) {
if (Partition.PARTITION_INIT_VERSION < partition.getVisibleVersion()) {
throw new DdlException("Do not support alter table's storage policy , this table ["
+ olapTable.getName() + "] has storage policy " + tableStoragePolicy
+ ", the table need to be empty.");
}
}
}
// check currentStoragePolicy resource exist.
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);
boolean enableUniqueKeyMergeOnWrite;
olapTable.readLock();
try {
enableUniqueKeyMergeOnWrite = olapTable.getEnableUniqueKeyMergeOnWrite();
} finally {
olapTable.readUnlock();
}
// must check here whether you can set the policy, otherwise there will be inconsistent metadata
if (enableUniqueKeyMergeOnWrite && !Strings.isNullOrEmpty(currentStoragePolicy)) {
throw new UserException(
"Can not set UNIQUE KEY table that enables Merge-On-write"
+ " with storage policy(" + currentStoragePolicy + ")");
}
olapTable.setStoragePolicy(currentStoragePolicy);
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkMinLoadReplicaNum(alterClauses)) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
processModifyMinLoadReplicaNum(db, olapTable, alterClause);
} else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
if (!Config.enable_feature_binlog) {
throw new DdlException("Binlog feature is not enabled");
}
// TODO(Drogon): check error
((SchemaChangeHandler) schemaChangeHandler).updateBinlogConfig(db, olapTable, alterClauses);
} else if (currentAlterOps.hasSchemaChangeOp()) {
// if modify storage type to v2, do schema change to convert all related tablets to segment v2 format
schemaChangeHandler.process(sql, alterClauses, db, olapTable);
} else if (currentAlterOps.hasRollupOp()) {
materializedViewHandler.process(alterClauses, db, olapTable);
} else if (currentAlterOps.hasPartitionOp()) {
Preconditions.checkState(!alterClauses.isEmpty());
for (AlterClause alterClause : alterClauses) {
olapTable.writeLockOrDdlException();
try {
if (alterClause instanceof DropPartitionClause) {
if (!((DropPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(olapTable);
}
Env.getCurrentEnv().dropPartition(db, olapTable, ((DropPartitionClause) alterClause));
} else if (alterClause instanceof ReplacePartitionClause) {
Env.getCurrentEnv().replaceTempPartition(db, olapTable, (ReplacePartitionClause) alterClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
// expand the partition names if it is 'Modify Partition(*)'
if (clause.isNeedExpand()) {
List<String> partitionNames = clause.getPartitionNames();
partitionNames.clear();
for (Partition partition : olapTable.getPartitions()) {
partitionNames.add(partition.getName());
}
}
Map<String, String> properties = clause.getProperties();
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
boolean isInMemory =
Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_INMEMORY));
if (isInMemory) {
throw new UserException("Not support set 'in_memory'='true' now!");
}
needProcessOutsideTableLock = true;
} else {
List<String> partitionNames = clause.getPartitionNames();
if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) {
modifyPartitionsProperty(db, olapTable, partitionNames, properties,
clause.isTempPartition());
} else {
needProcessOutsideTableLock = true;
}
}
} else if (alterClause instanceof DropPartitionFromIndexClause) {
// do nothing
} else if (alterClause instanceof AddPartitionClause
|| alterClause instanceof AddPartitionLikeClause
|| alterClause instanceof AlterMultiPartitionClause) {
needProcessOutsideTableLock = true;
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} finally {
olapTable.writeUnlock();
}
}
} else if (currentAlterOps.hasRenameOp()) {
processRename(db, olapTable, alterClauses);
newBaseTableInfo = Optional.of(new BaseTableInfo(olapTable));
} else if (currentAlterOps.hasReplaceTableOp()) {
processReplaceTable(db, olapTable, alterClauses);
// after replace table, olapTable may still be old name, so need set it to new name
ReplaceTableClause clause = (ReplaceTableClause) alterClauses.get(0);
String newTblName = clause.getTblName();
newBaseTableInfo = Optional.of(new BaseTableInfo(olapTable));
newBaseTableInfo.get().setTableName(newTblName);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.contains(AlterOpType.MODIFY_DISTRIBUTION)) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Env.getCurrentEnv()
.modifyDefaultDistributionBucketNum(db, olapTable, (ModifyDistributionClause) alterClause);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_COLUMN_COMMENT)) {
processModifyColumnComment(db, olapTable, alterClauses);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_COMMENT)) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
processModifyTableComment(db, olapTable, alterClause);
} else {
throw new DdlException("Invalid alter operations: " + currentAlterOps);
}
if (needChangeMTMVState(alterClauses)) {
Env.getCurrentEnv().getMtmvService()
.alterTable(oldBaseTableInfo, newBaseTableInfo, currentAlterOps.hasReplaceTableOp());
}
return needProcessOutsideTableLock;
}
private void setExternalTableAutoAnalyzePolicy(ExternalTable table, List<AlterClause> alterClauses) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Preconditions.checkState(alterClause instanceof ModifyTablePropertiesClause);
Map<String, String> properties = alterClause.getProperties();
Preconditions.checkState(properties.size() == 1);
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY));
String value = properties.get(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY);
Preconditions.checkState(PropertyAnalyzer.ENABLE_AUTO_ANALYZE_POLICY.equalsIgnoreCase(value)
|| PropertyAnalyzer.DISABLE_AUTO_ANALYZE_POLICY.equalsIgnoreCase(value)
|| PropertyAnalyzer.USE_CATALOG_AUTO_ANALYZE_POLICY.equalsIgnoreCase(value));
value = value.equalsIgnoreCase(PropertyAnalyzer.USE_CATALOG_AUTO_ANALYZE_POLICY) ? null : value;
table.getCatalog().setAutoAnalyzePolicy(table.getDatabase().getFullName(), table.getName(), value);
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(table.getCatalog().getName(),
table.getDatabase().getFullName(), table.getName(), properties);
Env.getCurrentEnv().getEditLog().logModifyTableProperties(info);
}
private boolean needChangeMTMVState(List<AlterClause> alterClauses) {
for (AlterClause alterClause : alterClauses) {
if (alterClause.needChangeMTMVState()) {
return true;
}
}
return false;
}
private void processModifyTableComment(Database db, OlapTable tbl, AlterClause alterClause)
throws DdlException {
tbl.writeLockOrDdlException();
try {
ModifyTableCommentClause clause = (ModifyTableCommentClause) alterClause;
tbl.setComment(clause.getComment());
// log
ModifyCommentOperationLog op = ModifyCommentOperationLog
.forTable(db.getId(), tbl.getId(), clause.getComment());
Env.getCurrentEnv().getEditLog().logModifyComment(op);
} finally {
tbl.writeUnlock();
}
}
private void processModifyColumnComment(Database db, OlapTable tbl, List<AlterClause> alterClauses)
throws DdlException {
tbl.writeLockOrDdlException();
try {
// check first
Map<String, String> colToComment = Maps.newHashMap();
for (AlterClause alterClause : alterClauses) {
Preconditions.checkState(alterClause instanceof ModifyColumnCommentClause);
ModifyColumnCommentClause clause = (ModifyColumnCommentClause) alterClause;
String colName = clause.getColName();
if (tbl.getColumn(colName) == null) {
throw new DdlException("Unknown column: " + colName);
}
if (colToComment.containsKey(colName)) {
throw new DdlException("Duplicate column: " + colName);
}
colToComment.put(colName, clause.getComment());
}
// modify comment
for (Map.Entry<String, String> entry : colToComment.entrySet()) {
Column col = tbl.getColumn(entry.getKey());
col.setComment(entry.getValue());
}
// log
ModifyCommentOperationLog op = ModifyCommentOperationLog.forColumn(db.getId(), tbl.getId(), colToComment);
Env.getCurrentEnv().getEditLog().logModifyComment(op);
} finally {
tbl.writeUnlock();
}
}
public void replayModifyComment(ModifyCommentOperationLog operation) throws MetaNotFoundException {
long dbId = operation.getDbId();
long tblId = operation.getTblId();
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
Table tbl = db.getTableOrMetaException(tblId);
tbl.writeLock();
try {
ModifyCommentOperationLog.Type type = operation.getType();
switch (type) {
case TABLE:
tbl.setComment(operation.getTblComment());
break;
case COLUMN:
for (Map.Entry<String, String> entry : operation.getColToComment().entrySet()) {
tbl.getColumn(entry.getKey()).setComment(entry.getValue());
}
break;
default:
break;
}
} finally {
tbl.writeUnlock();
}
}
private void processAlterExternalTable(AlterTableStmt stmt, Table externalTable, Database db)
throws UserException {
stmt.checkExternalTableOperationAllow(externalTable);
processAlterExternalTableInternal(stmt.getOps(), externalTable, db);
}
private void processAlterExternalTable(AlterTableCommand command, Table externalTable, Database db)
throws UserException {
List<AlterClause> alterClauses = new ArrayList<>();
alterClauses.addAll(command.getOps());
processAlterExternalTableInternal(alterClauses, externalTable, db);
}
private void processAlterExternalTableInternal(List<AlterClause> alterClauses, Table externalTable, Database db)
throws UserException {
// check conflict alter ops first
AlterOperations currentAlterOps = new AlterOperations();
currentAlterOps.checkConflict(alterClauses);
if (currentAlterOps.hasRenameOp()) {
processRename(db, externalTable, alterClauses);
} else if (currentAlterOps.hasSchemaChangeOp()) {
schemaChangeHandler.processExternalTable(alterClauses, db, externalTable);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_ENGINE)) {
ModifyEngineClause modifyEngineClause = (ModifyEngineClause) alterClauses.get(0);
processModifyEngine(db, externalTable, modifyEngineClause);
}
}
public void processModifyEngine(Database db, Table externalTable, ModifyEngineClause clause) throws DdlException {
externalTable.writeLockOrDdlException();
try {
if (externalTable.getType() != TableType.MYSQL) {
throw new DdlException("Only support modify table engine from MySQL to ODBC");
}
processModifyEngineInternal(db, externalTable, clause.getProperties(), false);
} finally {
externalTable.writeUnlock();
}
LOG.info("modify table {}'s engine from MySQL to ODBC", externalTable.getName());
}
public void replayProcessModifyEngine(ModifyTableEngineOperationLog log) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(log.getDbId());
if (db == null) {
return;
}
MysqlTable mysqlTable = (MysqlTable) db.getTableNullable(log.getTableId());
if (mysqlTable == null) {
return;
}
mysqlTable.writeLock();
try {
processModifyEngineInternal(db, mysqlTable, log.getProperties(), true);
} finally {
mysqlTable.writeUnlock();
}
}
private void processModifyEngineInternal(Database db, Table externalTable,
Map<String, String> prop, boolean isReplay) {
MysqlTable mysqlTable = (MysqlTable) externalTable;
Map<String, String> newProp = Maps.newHashMap(prop);
newProp.put(OdbcTable.ODBC_HOST, mysqlTable.getHost());
newProp.put(OdbcTable.ODBC_PORT, mysqlTable.getPort());
newProp.put(OdbcTable.ODBC_USER, mysqlTable.getUserName());
newProp.put(OdbcTable.ODBC_PASSWORD, mysqlTable.getPasswd());
newProp.put(OdbcTable.ODBC_DATABASE, mysqlTable.getMysqlDatabaseName());
newProp.put(OdbcTable.ODBC_TABLE, mysqlTable.getMysqlTableName());
newProp.put(OdbcTable.ODBC_TYPE, TOdbcTableType.MYSQL.name());
// create a new odbc table with same id and name
OdbcTable odbcTable = null;
try {
odbcTable = new OdbcTable(mysqlTable.getId(), mysqlTable.getName(), mysqlTable.getBaseSchema(), newProp);
} catch (DdlException e) {
LOG.warn("Should not happen", e);
return;
}
odbcTable.writeLock();
try {
db.unregisterTable(mysqlTable.getName());
db.registerTable(odbcTable);
if (!isReplay) {
ModifyTableEngineOperationLog log = new ModifyTableEngineOperationLog(db.getId(),
externalTable.getId(), prop);
Env.getCurrentEnv().getEditLog().logModifyTableEngine(log);
}
} finally {
odbcTable.writeUnlock();
}
}
public void processAlterTable(AlterTableStmt stmt) throws UserException {
TableName dbTableName = stmt.getTbl();
String ctlName = dbTableName.getCtl();
String dbName = dbTableName.getDb();
String tableName = dbTableName.getTbl();
DatabaseIf dbIf = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrException(ctlName, catalog -> new DdlException("Unknown catalog " + catalog))
.getDbOrDdlException(dbName);
TableIf tableIf = dbIf.getTableOrDdlException(tableName);
List<AlterClause> alterClauses = Lists.newArrayList();
// some operations will take long time to process, need to be done outside the table lock
boolean needProcessOutsideTableLock = false;
switch (tableIf.getType()) {
case MATERIALIZED_VIEW:
case OLAP:
if (tableIf.isTemporary()) {
throw new DdlException("Do not support alter temporary table[" + tableName + "]");
}
OlapTable olapTable = (OlapTable) tableIf;
needProcessOutsideTableLock = processAlterOlapTable(stmt, olapTable, alterClauses, (Database) dbIf);
break;
case ODBC:
case JDBC:
case HIVE:
case MYSQL:
case ELASTICSEARCH:
processAlterExternalTable(stmt, (Table) tableIf, (Database) dbIf);
return;
case HMS_EXTERNAL_TABLE:
case JDBC_EXTERNAL_TABLE:
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
case HUDI_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
alterClauses.addAll(stmt.getOps());
setExternalTableAutoAnalyzePolicy((ExternalTable) tableIf, alterClauses);
return;
default:
throw new DdlException("Do not support alter "
+ tableIf.getType().toString() + " table[" + tableName + "]");
}
Database db = (Database) dbIf;
// the following ops should done outside table lock. because it contain synchronized create operation
if (needProcessOutsideTableLock) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof AddPartitionClause) {
if (!((AddPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true);
} else if (alterClause instanceof AddPartitionLikeClause) {
if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addPartitionLike(db, tableName, (AddPartitionLikeClause) alterClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
Map<String, String> properties = clause.getProperties();
List<String> partitionNames = clause.getPartitionNames();
((SchemaChangeHandler) schemaChangeHandler).updatePartitionsProperties(
db, tableName, partitionNames, properties);
OlapTable olapTable = (OlapTable) tableIf;
olapTable.writeLockOrDdlException();
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties, clause.isTempPartition());
} finally {
olapTable.writeUnlock();
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
((SchemaChangeHandler) schemaChangeHandler).updateTableProperties(db, tableName, properties);
} else if (alterClause instanceof AlterMultiPartitionClause) {
if (!((AlterMultiPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addMultiPartitions(db, tableName, (AlterMultiPartitionClause) alterClause);
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
}
}
public void processAlterTable(AlterTableCommand command) throws UserException {
TableNameInfo dbTableName = command.getTbl();
String ctlName = dbTableName.getCtl();
String dbName = dbTableName.getDb();
String tableName = dbTableName.getTbl();
DatabaseIf dbIf = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrException(ctlName, catalog -> new DdlException("Unknown catalog " + catalog))
.getDbOrDdlException(dbName);
TableIf tableIf = dbIf.getTableOrDdlException(tableName);
List<AlterClause> alterClauses = Lists.newArrayList();
// some operations will take long time to process, need to be done outside the table lock
boolean needProcessOutsideTableLock = false;
switch (tableIf.getType()) {
case MATERIALIZED_VIEW:
case OLAP:
OlapTable olapTable = (OlapTable) tableIf;
needProcessOutsideTableLock = processAlterOlapTable(command, olapTable, alterClauses, (Database) dbIf);
break;
case ODBC:
case JDBC:
case HIVE:
case MYSQL:
case ELASTICSEARCH:
processAlterExternalTable(command, (Table) tableIf, (Database) dbIf);
return;
case HMS_EXTERNAL_TABLE:
case JDBC_EXTERNAL_TABLE:
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
case HUDI_EXTERNAL_TABLE:
case TRINO_CONNECTOR_EXTERNAL_TABLE:
alterClauses.addAll(command.getOps());
setExternalTableAutoAnalyzePolicy((ExternalTable) tableIf, alterClauses);
return;
default:
throw new DdlException("Do not support alter "
+ tableIf.getType().toString() + " table[" + tableName + "]");
}
Database db = (Database) dbIf;
// the following ops should done outside table lock. because it contain synchronized create operation
if (needProcessOutsideTableLock) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof AddPartitionClause) {
if (!((AddPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionClause) alterClause, false, 0, true);
} else if (alterClause instanceof AddPartitionLikeClause) {
if (!((AddPartitionLikeClause) alterClause).getIsTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addPartitionLike(db, tableName, (AddPartitionLikeClause) alterClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
Map<String, String> properties = clause.getProperties();
List<String> partitionNames = clause.getPartitionNames();
((SchemaChangeHandler) schemaChangeHandler).updatePartitionsProperties(
db, tableName, partitionNames, properties);
OlapTable olapTable = (OlapTable) tableIf;
olapTable.writeLockOrDdlException();
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties, clause.isTempPartition());
} finally {
olapTable.writeUnlock();
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
((SchemaChangeHandler) schemaChangeHandler).updateTableProperties(db, tableName, properties);
} else if (alterClause instanceof AlterMultiPartitionClause) {
if (!((AlterMultiPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP));
}
Env.getCurrentEnv().addMultiPartitions(db, tableName, (AlterMultiPartitionClause) alterClause);
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
}
}
// entry of processing replace table
private void processReplaceTable(Database db, OlapTable origTable, List<AlterClause> alterClauses)
throws UserException {
ReplaceTableClause clause = (ReplaceTableClause) alterClauses.get(0);
String newTblName = clause.getTblName();
Table newTable = db.getTableOrMetaException(newTblName);
if (newTable.getType() == TableType.MATERIALIZED_VIEW) {
throw new DdlException("replace table[" + newTblName + "] cannot be a materialized view");
}
boolean swapTable = clause.isSwapTable();
boolean isForce = clause.isForce();
processReplaceTable(db, origTable, newTblName, swapTable, isForce);
}
public void processReplaceTable(Database db, OlapTable origTable, String newTblName,
boolean swapTable, boolean isForce)
throws UserException {
db.writeLockOrDdlException();
try {
List<TableType> tableTypes = Lists.newArrayList(TableType.OLAP, TableType.MATERIALIZED_VIEW);
Table newTbl = db.getTableOrMetaException(newTblName, tableTypes);
if (newTbl.isTemporary()) {
throw new UserException("Do not support replace with temporary table");
}
OlapTable olapNewTbl = (OlapTable) newTbl;
List<Table> tableList = Lists.newArrayList(origTable, newTbl);
tableList.sort((Comparator.comparing(Table::getId)));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
String oldTblName = origTable.getName();
// First, we need to check whether the table to be operated on can be renamed
olapNewTbl.checkAndSetName(oldTblName, true);
if (swapTable) {
origTable.checkAndSetName(newTblName, true);
}
replaceTableInternal(db, origTable, olapNewTbl, swapTable, false, isForce);
// write edit log
ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(),
origTable.getId(), oldTblName, olapNewTbl.getId(), newTblName, swapTable, isForce);
Env.getCurrentEnv().getEditLog().logReplaceTable(log);
LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
} finally {
db.writeUnlock();
}
}
public void replayReplaceTable(ReplaceTableOperationLog log) throws MetaNotFoundException {
long dbId = log.getDbId();
long origTblId = log.getOrigTblId();
long newTblId = log.getNewTblId();
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
List<TableType> tableTypes = Lists.newArrayList(TableType.OLAP, TableType.MATERIALIZED_VIEW);
OlapTable origTable = (OlapTable) db.getTableOrMetaException(origTblId, tableTypes);
OlapTable newTbl = (OlapTable) db.getTableOrMetaException(newTblId, tableTypes);
List<Table> tableList = Lists.newArrayList(origTable, newTbl);
tableList.sort((Comparator.comparing(Table::getId)));
MetaLockUtils.writeLockTablesOrMetaException(tableList);
try {
replaceTableInternal(db, origTable, newTbl, log.isSwapTable(), true, log.isForce());
} catch (DdlException e) {
LOG.warn("should not happen", e);
} finally {
MetaLockUtils.writeUnlockTables(tableList);
}
LOG.info("finish replay replacing table {} with table {}, is swap: {}", origTblId, newTblId, log.isSwapTable());
}
/**
* The replace table operation works as follow:
* For example, REPLACE TABLE A WITH TABLE B.
* <p>
* 1. If "swapTable" is true, A will be renamed to B, and B will be renamed to A
* 1.1 check if A can be renamed to B (checking name conflict, etc...)
* 1.2 check if B can be renamed to A (checking name conflict, etc...)
* 1.3 rename B to A, drop old A, and add new A to database.
* 1.4 rename A to B, drop old B, and add new B to database.
* <p>
* 2. If "swapTable" is false, A will be dropped, and B will be renamed to A
* 1.1 check if B can be renamed to A (checking name conflict, etc...)
* 1.2 rename B to A, drop old A, and add new A to database.
*/
private void replaceTableInternal(Database db, OlapTable origTable, OlapTable newTbl, boolean swapTable,
boolean isReplay, boolean isForce)
throws DdlException {
String oldTblName = origTable.getName();
String newTblName = newTbl.getName();
// drop origin table and new table
db.unregisterTable(oldTblName);
db.unregisterTable(newTblName);
// rename new table name to origin table name and add it to database
newTbl.checkAndSetName(oldTblName, false);
db.registerTable(newTbl);
if (swapTable) {
// rename origin table name to new table name and add it to database
origTable.checkAndSetName(newTblName, false);
db.registerTable(origTable);
} else {
// not swap, the origin table is not used anymore, need to drop all its tablets.
// put original table to recycle bin.
if (isForce) {
Env.getCurrentEnv().onEraseOlapTable(origTable, isReplay);
} else {
Env.getCurrentRecycleBin().recycleTable(db.getId(), origTable, isReplay, isForce, 0);
}
Env.getCurrentEnv().getAnalysisManager().removeTableStats(origTable.getId());
if (origTable instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().dropJob((MTMV) origTable, isReplay);
}
}
}
public void processAlterView(AlterViewStmt stmt, ConnectContext ctx) throws UserException {
TableName dbTableName = stmt.getTbl();
String dbName = dbTableName.getDb();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
String tableName = dbTableName.getTbl();
View view = (View) db.getTableOrMetaException(tableName, TableType.VIEW);
modifyViewDef(db, view, stmt.getInlineViewDef(), ctx.getSessionVariable().getSqlMode(),
stmt.getColumns(), stmt.getComment());
}
private void modifyViewDef(Database db, View view, String inlineViewDef, long sqlMode,
List<Column> newFullSchema, String comment) throws DdlException {
db.writeLockOrDdlException();
try {
view.writeLockOrDdlException();
try {
if (comment != null) {
view.setComment(comment);
}
// when do alter view modify comment, inlineViewDef and newFullSchema will be empty.
if (!Strings.isNullOrEmpty(inlineViewDef)) {
view.setInlineViewDefWithSqlMode(inlineViewDef, sqlMode);
view.setNewFullSchema(newFullSchema);
}
String viewName = view.getName();
db.unregisterTable(viewName);
db.registerTable(view);
AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(),
inlineViewDef, newFullSchema, sqlMode, comment);
Env.getCurrentEnv().getEditLog().logModifyViewDef(alterViewInfo);
LOG.info("modify view[{}] definition to {}", viewName, inlineViewDef);
} finally {
view.writeUnlock();
}
} finally {
db.writeUnlock();
}
}
public void replayModifyViewDef(AlterViewInfo alterViewInfo) throws MetaNotFoundException, DdlException {
long dbId = alterViewInfo.getDbId();
long tableId = alterViewInfo.getTableId();
String inlineViewDef = alterViewInfo.getInlineViewDef();
List<Column> newFullSchema = alterViewInfo.getNewFullSchema();
String comment = alterViewInfo.getComment();
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
View view = (View) db.getTableOrMetaException(tableId, TableType.VIEW);
db.writeLock();
view.writeLock();
try {
String viewName = view.getName();
if (comment != null) {
view.setComment(comment);
} else {
view.setInlineViewDefWithSqlMode(inlineViewDef, alterViewInfo.getSqlMode());
view.setNewFullSchema(newFullSchema);
}
// We do not need to init view here.
// During the `init` phase, some `Alter-View` statements will access the remote file system,
// but they should not access it during the metadata replay phase.
db.unregisterTable(viewName);
db.registerTable(view);
LOG.info("replay modify view[{}] definition to {}", viewName, inlineViewDef);
} finally {
view.writeUnlock();
db.writeUnlock();
}
}
public void processAlterSystem(AlterSystemStmt stmt) throws UserException {
systemHandler.process(Collections.singletonList(stmt.getAlterClause()), null, null);
}
public void processAlterSystem(AlterSystemCommand command) throws UserException {
systemHandler.processForNereids(Collections.singletonList(command), null, null);
}
private void processRename(Database db, OlapTable table, List<AlterClause> alterClauses) throws DdlException {
for (AlterClause alterClause : alterClauses) {
if (alterClause instanceof TableRenameClause) {
Env.getCurrentEnv().renameTable(db, table, (TableRenameClause) alterClause);
break;
} else {
if (alterClause instanceof RollupRenameClause) {
Env.getCurrentEnv().renameRollup(db, table, (RollupRenameClause) alterClause);
break;
} else if (alterClause instanceof PartitionRenameClause) {
Env.getCurrentEnv().renamePartition(db, table, (PartitionRenameClause) alterClause);
break;
} else if (alterClause instanceof ColumnRenameClause) {
Env.getCurrentEnv().renameColumn(db, table, (ColumnRenameClause) alterClause);
break;
} else {
Preconditions.checkState(false);
}
}
}
}
private void processRename(Database db, Table table, List<AlterClause> alterClauses) throws DdlException {
for (AlterClause alterClause : alterClauses) {
if (alterClause instanceof TableRenameClause) {
Env.getCurrentEnv().renameTable(db, table, (TableRenameClause) alterClause);
break;
} else {
Preconditions.checkState(false);
}
}
}
/**
* Batch update partitions' properties
* caller should hold the table lock
*/
public void modifyPartitionsProperty(Database db,
OlapTable olapTable,
List<String> partitionNames,
Map<String, String> properties,
boolean isTempPartition)
throws DdlException, AnalysisException {
checkNoForceProperty(properties);
Preconditions.checkArgument(olapTable.isWriteLockHeldByCurrentThread());
List<ModifyPartitionInfo> modifyPartitionInfos = Lists.newArrayList();
olapTable.checkNormalStateForAlter();
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName, isTempPartition);
if (partition == null) {
throw new DdlException(
"Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]");
}
}
boolean hasInMemory = false;
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
hasInMemory = true;
}
// get value from properties here
// 1. replica allocation
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
if (!replicaAlloc.isNotSet()) {
olapTable.checkChangeReplicaAllocation();
}
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
// 2. in memory
boolean newInMemory = PropertyAnalyzer.analyzeBooleanProp(properties,
PropertyAnalyzer.PROPERTIES_INMEMORY, false);
// 3. tablet type
TTabletType tTabletType =
PropertyAnalyzer.analyzeTabletType(properties);
// modify meta here
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, Long> tableBeToReplicaNumMap = Maps.newHashMap();
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName, isTempPartition);
Map<Long, Long> partitionBeToReplicaNumMap = getReplicaCountByBackend(partition);
for (Map.Entry<Long, Long> entry : partitionBeToReplicaNumMap.entrySet()) {
tableBeToReplicaNumMap.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName, isTempPartition);
// 4. data property
// 4.1 get old data property from partition
DataProperty dataProperty = partitionInfo.getDataProperty(partition.getId());
Map<String, String> modifiedProperties = Maps.newHashMap();
modifiedProperties.putAll(properties);
// 4.3 modify partition storage policy
// can set multi times storage policy
String currentStoragePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
if (!currentStoragePolicy.equals("")) {
// check currentStoragePolicy resource exist.
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);
partitionInfo.setStoragePolicy(partition.getId(), currentStoragePolicy);
} else {
// if current partition is already in remote storage
if (partition.getRemoteDataSize() > 0) {
throw new AnalysisException(
"Cannot cancel storage policy for partition which is already on cold storage.");
}
// if current partition will be cooldown in 20s later
StoragePolicy checkedPolicyCondition = StoragePolicy.ofCheck(dataProperty.getStoragePolicy());
StoragePolicy policy = (StoragePolicy) Env.getCurrentEnv().getPolicyMgr()
.getPolicy(checkedPolicyCondition);
if (policy != null) {
long latestTime = policy.getCooldownTimestampMs() > 0 ? policy.getCooldownTimestampMs()
: Long.MAX_VALUE;
if (policy.getCooldownTtl() > 0) {
latestTime = Math.min(latestTime,
partition.getVisibleVersionTime() + policy.getCooldownTtl() * 1000);
}
if (latestTime < System.currentTimeMillis() + 20 * 1000) {
throw new AnalysisException(
"Cannot cancel storage policy for partition which already be cooldown"
+ " or will be cooldown soon later");
}
}
partitionInfo.setStoragePolicy(partition.getId(), "");
}
// 4.4 analyze new properties
DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(modifiedProperties, dataProperty);
// 1. date property
if (newDataProperty != null) {
partitionInfo.setDataProperty(partition.getId(), newDataProperty);
}
// 2. replica allocation
if (!replicaAlloc.isNotSet()) {
if (Config.isNotCloudMode() && !olapTable.isColocateTable()) {
setReplicasToDrop(partition, partitionInfo.getReplicaAllocation(partition.getId()),
replicaAlloc, tableBeToReplicaNumMap);
}
partitionInfo.setReplicaAllocation(partition.getId(), replicaAlloc);
}
// 3. in memory
boolean oldInMemory = partitionInfo.getIsInMemory(partition.getId());
if (hasInMemory && (newInMemory != oldInMemory)) {
partitionInfo.setIsInMemory(partition.getId(), newInMemory);
}
// 4. tablet type
if (tTabletType != partitionInfo.getTabletType(partition.getId())) {
partitionInfo.setTabletType(partition.getId(), tTabletType);
}
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(), partition.getId(),
newDataProperty, replicaAlloc, hasInMemory ? newInMemory : oldInMemory, currentStoragePolicy,
Maps.newHashMap());
modifyPartitionInfos.add(info);
}
// log here
BatchModifyPartitionsInfo info = new BatchModifyPartitionsInfo(modifyPartitionInfos);
Env.getCurrentEnv().getEditLog().logBatchModifyPartition(info);
}
public void setReplicasToDrop(Partition partition,
ReplicaAllocation oldReplicaAlloc,
ReplicaAllocation newReplicaAlloc,
Map<Long, Long> tableBeToReplicaNumMap) {
if (newReplicaAlloc.getAllocMap().entrySet().stream().noneMatch(
entry -> entry.getValue() < oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))) {
return;
}
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
List<Long> aliveBes = systemInfoService.getAllBackendIds(true);
processReplicasInPartition(partition,
tableBeToReplicaNumMap, systemInfoService, oldReplicaAlloc, newReplicaAlloc, aliveBes);
}
private void processReplicasInPartition(Partition partition,
Map<Long, Long> tableBeToReplicaNumMap, SystemInfoService systemInfoService,
ReplicaAllocation oldReplicaAlloc, ReplicaAllocation newReplicaAlloc,
List<Long> aliveBes) {
List<Tag> changeTags = newReplicaAlloc.getAllocMap().entrySet().stream()
.filter(entry -> entry.getValue() < oldReplicaAlloc.getReplicaNumByTag(entry.getKey()))
.map(Map.Entry::getKey).collect(Collectors.toList());
Map<Long, Long> partitionBeToReplicaNumMap = getReplicaCountByBackend(partition);
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
if (!isTabletHealthy(tablet, systemInfoService, partition, oldReplicaAlloc, aliveBes)) {
continue;
}
Map<Tag, List<Replica>> tagToReplicaMap = getReplicasWithTag(tablet);
for (Tag tag : changeTags) {
List<Replica> toDealReplicas = tagToReplicaMap.get(tag);
if (toDealReplicas == null || toDealReplicas.isEmpty()) {
continue;
}
sortReplicasByBackendCount(toDealReplicas, tableBeToReplicaNumMap, partitionBeToReplicaNumMap);
int replicasToDrop = oldReplicaAlloc.getReplicaNumByTag(tag)
- newReplicaAlloc.getReplicaNumByTag(tag);
markReplicasForDropping(toDealReplicas, replicasToDrop,
tableBeToReplicaNumMap, partitionBeToReplicaNumMap);
}
}
}
}
private boolean isTabletHealthy(Tablet tablet, SystemInfoService systemInfoService,
Partition partition, ReplicaAllocation oldReplicaAlloc,
List<Long> aliveBes) {
return tablet.getHealth(systemInfoService, partition.getVisibleVersion(), oldReplicaAlloc, aliveBes)
.status == Tablet.TabletStatus.HEALTHY;
}
private Map<Tag, List<Replica>> getReplicasWithTag(Tablet tablet) {
return tablet.getReplicas().stream()
.collect(Collectors.groupingBy(replica -> Env.getCurrentSystemInfo()
.getBackend(replica.getBackendIdWithoutException()).getLocationTag()));
}
private void sortReplicasByBackendCount(List<Replica> replicas,
Map<Long, Long> tableBeToReplicaNumMap,
Map<Long, Long> partitionBeToReplicaNumMap) {
replicas.sort((Replica r1, Replica r2) -> {
long countPartition1 = partitionBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L);
long countPartition2 = partitionBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L);
if (countPartition1 != countPartition2) {
return Long.compare(countPartition2, countPartition1);
}
long countTable1 = tableBeToReplicaNumMap.getOrDefault(r1.getBackendIdWithoutException(), 0L);
long countTable2 = tableBeToReplicaNumMap.getOrDefault(r2.getBackendIdWithoutException(), 0L);
return Long.compare(countTable2, countTable1); // desc sort
});
}
private void markReplicasForDropping(List<Replica> replicas, int replicasToDrop,
Map<Long, Long> tableBeToReplicaNumMap,
Map<Long, Long> partitionBeToReplicaNumMap) {
for (int i = 0; i < replicas.size(); i++) {
Replica r = replicas.get(i);
long beId = r.getBackendIdWithoutException();
if (i >= replicasToDrop) {
r.setScaleInDropTimeStamp(-1); // Mark for not dropping
} else {
r.setScaleInDropTimeStamp(System.currentTimeMillis()); // Mark for dropping
tableBeToReplicaNumMap.put(beId, tableBeToReplicaNumMap.get(beId) - 1);
partitionBeToReplicaNumMap.put(beId, partitionBeToReplicaNumMap.get(beId) - 1);
}
}
}
public static Map<Long, Long> getReplicaCountByBackend(Partition partition) {
return partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE).stream()
.flatMap(index -> index.getTablets().stream())
.flatMap(tablet -> tablet.getBackendIds().stream())
.collect(Collectors.groupingBy(id -> id, Collectors.counting()));
}
public void checkNoForceProperty(Map<String, String> properties) throws DdlException {
for (RewriteProperty property : PropertyAnalyzer.getInstance().getForceProperties()) {
if (properties.containsKey(property.key())) {
throw new DdlException("Cann't modify property '" + property.key() + "'"
+ (Config.isCloudMode() ? " in cloud mode" : "") + ".");
}
}
}
public void replayModifyPartition(ModifyPartitionInfo info) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(info.getDbId());
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
olapTable.writeLock();
try {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (info.getDataProperty() != null) {
partitionInfo.setDataProperty(info.getPartitionId(), info.getDataProperty());
}
if (!info.getReplicaAlloc().isNotSet()) {
partitionInfo.setReplicaAllocation(info.getPartitionId(), info.getReplicaAlloc());
}
Optional.ofNullable(info.getStoragePolicy()).filter(p -> !p.isEmpty())
.ifPresent(p -> partitionInfo.setStoragePolicy(info.getPartitionId(), p));
partitionInfo.setIsInMemory(info.getPartitionId(), info.isInMemory());
Map<String, String> tblProperties = info.getTblProperties();
if (tblProperties != null && !tblProperties.isEmpty()) {
olapTable.setReplicaAllocation(tblProperties);
}
} finally {
olapTable.writeUnlock();
}
}
private void processModifyMinLoadReplicaNum(Database db, OlapTable olapTable, AlterClause alterClause)
throws DdlException {
Map<String, String> properties = alterClause.getProperties();
short minLoadReplicaNum = -1;
try {
minLoadReplicaNum = PropertyAnalyzer.analyzeMinLoadReplicaNum(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= "
+ "default replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
if (olapTable.dynamicPartitionExists()) {
replicaAlloc = olapTable.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation();
if (!replicaAlloc.isNotSet() && minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
throw new DdlException("Failed to check min load replica num [" + minLoadReplicaNum + "] <= "
+ "dynamic partition replica num [" + replicaAlloc.getTotalReplicaNum() + "]");
}
}
properties.put(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, Short.toString(minLoadReplicaNum));
olapTable.setMinLoadReplicaNum(minLoadReplicaNum);
olapTable.writeLockOrDdlException();
try {
Env.getCurrentEnv().modifyTableProperties(db, olapTable, properties);
} finally {
olapTable.writeUnlock();
}
}
public Set<Long> getUnfinishedAlterTableIds() {
Set<Long> unfinishedTableIds = Sets.newHashSet();
for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (IndexChangeJob job : ((SchemaChangeHandler) schemaChangeHandler).getIndexChangeJobs().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (AlterJobV2 job : materializedViewHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
return unfinishedTableIds;
}
public AlterHandler getSchemaChangeHandler() {
return schemaChangeHandler;
}
public AlterHandler getMaterializedViewHandler() {
return materializedViewHandler;
}
public AlterHandler getSystemHandler() {
return systemHandler;
}
public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
TableNameInfo tbl = alterMTMV.getMvName();
MTMV mtmv = null;
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tbl.getDb());
mtmv = (MTMV) db.getTableOrMetaException(tbl.getTbl(), TableType.MATERIALIZED_VIEW);
switch (alterMTMV.getOpType()) {
case ALTER_REFRESH_INFO:
mtmv.alterRefreshInfo(alterMTMV.getRefreshInfo());
break;
case ALTER_STATUS:
mtmv.alterStatus(alterMTMV.getStatus());
break;
case ALTER_PROPERTY:
mtmv.alterMvProperties(alterMTMV.getMvProperties());
break;
case ADD_TASK:
mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots(),
isReplay);
break;
default:
throw new RuntimeException("Unknown type value: " + alterMTMV.getOpType());
}
if (alterMTMV.isNeedRebuildJob()) {
Env.getCurrentEnv().getMtmvService().alterJob(mtmv, isReplay);
}
// 4. log it and replay it in the follower
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logAlterMTMV(alterMTMV);
}
} catch (UserException e) {
// if MTMV has been dropped, ignore this exception
LOG.warn(e);
}
}
}