CloudSchemaChangeJobV2.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.proto.OlapFile;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Table;
import com.google.common.collect.Table.Cell;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class CloudSchemaChangeJobV2 extends SchemaChangeJobV2 {
private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class);
public CloudSchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId,
String tableName, long timeoutMs) {
super(rawSql, jobId, dbId, tableId, tableName, timeoutMs);
ConnectContext context = ConnectContext.get();
if (context != null) {
String clusterName = "";
try {
clusterName = context.getCloudCluster();
} catch (ComputeGroupException e) {
LOG.warn("failed to get compute group name", e);
}
LOG.debug("rollup job add cloud cluster, context not null, cluster: {}", clusterName);
if (!Strings.isNullOrEmpty(clusterName)) {
setCloudClusterName(clusterName);
}
}
LOG.debug("schema change job add cloud cluster, context {}", context);
}
private CloudSchemaChangeJobV2() {}
@Override
protected void commitShadowIndex() throws AlterCancelException {
List<Long> shadowIdxList =
indexIdMap.keySet().stream().collect(Collectors.toList());
try {
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.commitMaterializedIndex(dbId, tableId, shadowIdxList, false);
} catch (Exception e) {
LOG.warn("commitMaterializedIndex exception:", e);
throw new AlterCancelException(e.getMessage());
}
LOG.info("commitShadowIndex finished, dbId:{}, tableId:{}, jobId:{}, shadowIdxList:{}",
dbId, tableId, jobId, shadowIdxList);
}
@Override
protected void onCancel() {
if (Config.enable_check_compatibility_mode) {
LOG.info("skip drop shadown indexes in checking compatibility mode");
return;
}
List<Long> shadowIdxList = indexIdMap.keySet().stream().collect(Collectors.toList());
dropIndex(shadowIdxList);
long tryTimes = 1;
while (true) {
try {
Set<Table.Cell<Long, Long, Map<Long, Long>>> tableSet = partitionIndexTabletMap.cellSet();
Iterator<Cell<Long, Long, Map<Long, Long>>> it = tableSet.iterator();
while (it.hasNext()) {
Table.Cell<Long, Long, Map<Long, Long>> data = it.next();
Long partitionId = data.getRowKey();
Long shadowIndexId = data.getColumnKey();
Long originIndexId = indexIdMap.get(shadowIndexId);
Map<Long, Long> shadowTabletIdToOriginTabletId = data.getValue();
for (Map.Entry<Long, Long> entry : shadowTabletIdToOriginTabletId.entrySet()) {
Long shadowTabletId = entry.getKey();
Long originTabletId = entry.getValue();
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.removeSchemaChangeJob(jobId, dbId, tableId, originIndexId, shadowIndexId,
partitionId, originTabletId, shadowTabletId);
}
LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms."
+ "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}",
dbId, tableId, originIndexId, partitionId, shadowTabletIdToOriginTabletId.size());
}
break;
} catch (Exception e) {
LOG.warn("tryTimes:{}, onCancel exception:", tryTimes, e);
}
sleepSeveralSeconds();
tryTimes++;
}
}
@Override
protected void postProcessOriginIndex() {
if (Config.enable_check_compatibility_mode) {
LOG.info("skip drop origin indexes in checking compatibility mode");
return;
}
List<Long> originIdxList = indexIdMap.values().stream().collect(Collectors.toList());
dropIndex(originIdxList);
}
private void dropIndex(List<Long> idxList) {
int tryTimes = 1;
while (true) {
try {
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.dropMaterializedIndex(tableId, idxList, false);
break;
} catch (Exception e) {
LOG.warn("drop index failed, retry times {}, dbId: {}, tableId: {}, jobId: {}, idxList: {}:",
tryTimes, dbId, tableId, jobId, idxList, e);
}
sleepSeveralSeconds();
tryTimes++;
}
LOG.info("dropIndex finished, dbId:{}, tableId:{}, jobId:{}, IdxList:{}",
dbId, tableId, jobId, idxList);
}
@Override
protected void createShadowIndexReplica() throws AlterCancelException {
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
// 1. create replicas
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
Long expiration = (createTimeMs + timeoutMs) / 1000;
tbl.readLock();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
try {
List<Long> shadowIdxList = indexIdMap.keySet().stream().collect(Collectors.toList());
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.prepareMaterializedIndex(tableId, shadowIdxList,
expiration);
createShadowIndexReplicaForPartition(tbl);
} catch (Exception e) {
LOG.warn("createCloudShadowIndexReplica Exception:", e);
throw new AlterCancelException(e.getMessage());
}
} finally {
tbl.readUnlock();
}
// create all replicas success.
// add all shadow indexes to catalog
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
addShadowIndexToCatalog(tbl);
} finally {
tbl.writeUnlock();
}
}
private void createShadowIndexReplicaForPartition(OlapTable tbl) throws Exception {
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
List<Integer> clusterKeyUids = null;
if (shadowIdxId == tbl.getBaseIndexId() || isShadowIndexOfBase(shadowIdxId, tbl)) {
clusterKeyUids = OlapTable.getClusterKeyUids(shadowSchema);
}
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int shadowSchemaVersion = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion;
long originIndexId = indexIdMap.get(shadowIdxId);
KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId);
List<Index> tabletIndexes = originIndexId == tbl.getBaseIndexId() ? indexes : null;
Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
OlapFile.TabletMetaCloudPB.Builder builder =
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.createTabletMetaBuilder(tableId, shadowIdxId,
partitionId, shadowTablet,
tbl.getPartitionInfo().getTabletType(partitionId),
shadowSchemaHash, originKeysType, shadowShortKeyColumnCount, bfColumns,
bfFpp, tabletIndexes, shadowSchema, tbl.getDataSortInfo(),
tbl.getCompressionType(),
tbl.getStoragePolicy(), tbl.isInMemory(), true,
tbl.getName(), tbl.getTTLSeconds(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(),
shadowSchemaVersion, tbl.getCompactionPolicy(),
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.disableAutoCompaction(),
tbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
tbl.getInvertedIndexFileStorageFormat(),
tbl.rowStorePageSize(),
tbl.variantEnableFlattenNested(), clusterKeyUids,
tbl.storagePageSize());
requestBuilder.addTabletMetas(builder);
} // end for rollupTablets
requestBuilder.setDbId(dbId);
((CloudInternalCatalog) Env.getCurrentInternalCatalog())
.sendCreateTabletsRpc(requestBuilder);
}
}
}
@Override
protected void ensureCloudClusterExist(List<AgentTask> tasks) throws AlterCancelException {
if (((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterIdByName(cloudClusterName) == null) {
for (AgentTask task : tasks) {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
}
StringBuilder sb = new StringBuilder("cloud cluster(");
sb.append(cloudClusterName);
sb.append(") has been removed, jobId=");
sb.append(jobId);
String msg = sb.toString();
LOG.warn(msg);
throw new AlterCancelException(msg);
}
}
@Override
protected boolean checkTableStable(Database db) throws AlterCancelException {
return true;
}
}