SchemaChangeJobV2.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.collect.Table.Cell;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/*
* Version 2 of SchemaChangeJob.
* This is for replacing the old SchemaChangeJob
* https://github.com/apache/doris/issues/1429
*/
public class SchemaChangeJobV2 extends AlterJobV2 {
private static final Logger LOG = LogManager.getLogger(SchemaChangeJobV2.class);
// partition id -> (shadow index id -> (shadow tablet id -> origin tablet id))
@SerializedName(value = "partitionIndexTabletMap")
protected Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap = HashBasedTable.create();
// partition id -> (shadow index id -> shadow index))
@SerializedName(value = "partitionIndexMap")
protected Table<Long, Long, MaterializedIndex> partitionIndexMap = HashBasedTable.create();
// shadow index id -> origin index id
@SerializedName(value = "indexIdMap")
protected Map<Long, Long> indexIdMap = Maps.newHashMap();
// partition id -> origin index id
@SerializedName(value = "partitionOriginIndexIdMap")
private Map<Long, Long> partitionOriginIndexIdMap = Maps.newHashMap();
// shadow index id -> shadow index name(__doris_shadow_xxx)
@SerializedName(value = "indexIdToName")
private Map<Long, String> indexIdToName = Maps.newHashMap();
// shadow index id -> index schema
@SerializedName(value = "indexSchemaMap")
protected Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap();
// shadow index id -> (shadow index schema version : schema hash)
@SerializedName(value = "indexSchemaVersionAndHashMap")
protected Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = Maps.newHashMap();
// shadow index id -> shadow index short key count
@SerializedName(value = "indexShortKeyMap")
protected Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
// bloom filter info
@SerializedName(value = "hasBfChange")
private boolean hasBfChange;
@SerializedName(value = "bfColumns")
protected Set<String> bfColumns = null;
@SerializedName(value = "bfFpp")
protected double bfFpp = 0;
// alter index info
@SerializedName(value = "indexChange")
private boolean indexChange = false;
@SerializedName(value = "indexes")
protected List<Index> indexes = null;
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
@SerializedName(value = "rowStoreColumns")
protected List<String> rowStoreColumns = null;
@SerializedName(value = "storeRowColumn")
protected boolean storeRowColumn = false;
@SerializedName(value = "hasRowStoreChange")
protected boolean hasRowStoreChange = false;
// save all schema change tasks
AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
protected SchemaChangeJobV2() {
super(JobType.SCHEMA_CHANGE);
}
// Don't call it directly, use AlterJobV2Factory to replace
public SchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName,
long timeoutMs) {
super(rawSql, jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
}
public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) {
Map<Long, Long> tabletMap = partitionIndexTabletMap.get(partitionId, shadowIdxId);
if (tabletMap == null) {
tabletMap = Maps.newHashMap();
partitionIndexTabletMap.put(partitionId, shadowIdxId, tabletMap);
}
tabletMap.put(shadowTabletId, originTabletId);
}
public void addPartitionShadowIndex(long partitionId, long shadowIdxId, MaterializedIndex shadowIdx) {
partitionIndexMap.put(partitionId, shadowIdxId, shadowIdx);
}
public void addPartitionOriginIndexIdMap(long partitionId, long originIdxId) {
partitionOriginIndexIdMap.put(partitionId, originIdxId);
}
public void addIndexSchema(long shadowIdxId, long originIdxId,
String shadowIndexName, int shadowSchemaVersion, int shadowSchemaHash,
short shadowIdxShortKeyCount, List<Column> shadowIdxSchema) {
indexIdMap.put(shadowIdxId, originIdxId);
indexIdToName.put(shadowIdxId, shadowIndexName);
indexSchemaVersionAndHashMap.put(shadowIdxId, new SchemaVersionAndHash(shadowSchemaVersion, shadowSchemaHash));
indexShortKeyMap.put(shadowIdxId, shadowIdxShortKeyCount);
indexSchemaMap.put(shadowIdxId, shadowIdxSchema);
}
public void setBloomFilterInfo(boolean hasBfChange, Set<String> bfColumns, double bfFpp) {
this.hasBfChange = hasBfChange;
this.bfColumns = bfColumns;
this.bfFpp = bfFpp;
}
public void setStoreRowColumnInfo(boolean hasRowStoreChange,
boolean storeRowColumn, List<String> rowStoreColumns) {
this.hasRowStoreChange = hasRowStoreChange;
this.storeRowColumn = storeRowColumn;
this.rowStoreColumns = rowStoreColumns;
}
public void setAlterIndexInfo(boolean indexChange, List<Index> indexes) {
this.indexChange = indexChange;
this.indexes = indexes;
}
public void setStorageFormat(TStorageFormat storageFormat) {
this.storageFormat = storageFormat;
}
/**
* clear some date structure in this job to save memory
* these data structures must not used in getInfo method
*/
private void pruneMeta() {
partitionIndexTabletMap.clear();
partitionIndexMap.clear();
indexSchemaMap.clear();
indexShortKeyMap.clear();
partitionOriginIndexIdMap.clear();
}
protected boolean isShadowIndexOfBase(long shadowIdxId, OlapTable tbl) {
if (indexIdToName.get(shadowIdxId).startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
String shadowIndexName = indexIdToName.get(shadowIdxId);
String indexName = shadowIndexName
.substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length());
long indexId = tbl.getIndexIdByName(indexName);
LOG.info("shadow index id: {}, shadow index name: {}, pointer to index id: {}, index name: {}, "
+ "base index id: {}, table_id: {}", shadowIdxId, shadowIndexName, indexId, indexName,
tbl.getBaseIndexId(), tbl.getId());
return indexId == tbl.getBaseIndexId();
}
return false;
}
protected void createShadowIndexReplica() throws AlterCancelException {
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
if (!checkTableStable(db)) {
return;
}
// 1. create replicas
AgentBatchTask batchTask = new AgentBatchTask();
// count total replica num
int totalReplicaNum = 0;
for (MaterializedIndex shadowIdx : partitionIndexMap.values()) {
for (Tablet tablet : shadowIdx.getTablets()) {
totalReplicaNum += tablet.getReplicas().size();
}
}
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum);
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
tbl.readLock();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
TStorageMedium storageMedium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
List<Integer> clusterKeyUids = null;
if (shadowIdxId == tbl.getBaseIndexId() || isShadowIndexOfBase(shadowIdxId, tbl)) {
clusterKeyUids = OlapTable.getClusterKeyUids(shadowSchema);
}
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
long originIndexId = indexIdMap.get(shadowIdxId);
int originSchemaHash = tbl.getSchemaHashByIndexId(originIndexId);
KeysType originKeysType = tbl.getKeysTypeByIndexId(originIndexId);
List<Index> tabletIndexes = originIndexId == tbl.getBaseIndexId() ? indexes : null;
for (Tablet shadowTablet : shadowIdx.getTablets()) {
long shadowTabletId = shadowTablet.getId();
List<Replica> shadowReplicas = shadowTablet.getReplicas();
for (Replica shadowReplica : shadowReplicas) {
long backendId = shadowReplica.getBackendIdWithoutException();
long shadowReplicaId = shadowReplica.getId();
countDownLatch.addMark(backendId, shadowTabletId);
CreateReplicaTask createReplicaTask = new CreateReplicaTask(
backendId, dbId, tableId, partitionId, shadowIdxId, shadowTabletId,
shadowReplicaId, shadowShortKeyColumnCount, shadowSchemaHash,
Partition.PARTITION_INIT_VERSION,
originKeysType, TStorageType.COLUMN, storageMedium,
shadowSchema, bfColumns, bfFpp, countDownLatch, tabletIndexes,
tbl.isInMemory(),
tbl.getPartitionInfo().getTabletType(partitionId),
null,
tbl.getCompressionType(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(),
tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
tbl.getCompactionPolicy(),
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig,
tbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
objectPool,
tbl.rowStorePageSize(),
tbl.variantEnableFlattenNested(),
tbl.storagePageSize());
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
}
createReplicaTask.setInvertedIndexFileStorageFormat(tbl
.getInvertedIndexFileStorageFormat());
if (!CollectionUtils.isEmpty(clusterKeyUids)) {
createReplicaTask.setClusterKeyUids(clusterKeyUids);
LOG.info("table: {}, partition: {}, index: {}, tablet: {}, cluster key uids: {}",
tableId, partitionId, shadowIdxId, shadowTabletId, clusterKeyUids);
}
batchTask.addTask(createReplicaTask);
} // end for rollupReplicas
} // end for rollupTablets
}
}
} finally {
tbl.readUnlock();
}
if (!FeConstants.runningUnitTest) {
// send all tasks and wait them finished
AgentTaskQueue.addBatchTask(batchTask);
AgentTaskExecutor.submit(batchTask);
long timeout = DbUtil.getCreateReplicasTimeoutMs(totalReplicaNum);
boolean ok = false;
try {
ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
ok = false;
}
if (!ok || !countDownLatch.getStatus().ok()) {
// create replicas failed. just cancel the job
// clear tasks and show the failed replicas to user
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
String errMsg = null;
if (!countDownLatch.getStatus().ok()) {
errMsg = countDownLatch.getStatus().getErrorMsg();
} else {
// only show at most 3 results
List<String> subList = countDownLatch.getLeftMarks().stream().limit(3)
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
.collect(Collectors.toList());
errMsg = "Error replicas:" + Joiner.on(", ").join(subList);
}
LOG.warn("failed to create replicas for job: {}, {}", jobId, errMsg);
throw new AlterCancelException("Create replicas failed. Error: " + errMsg);
}
}
// create all replicas success.
// add all shadow indexes to catalog
while (DebugPointUtil.isEnable("FE.SchemaChangeJobV2.createShadowIndexReplica.addShadowIndexToCatalog.block")) {
try {
Thread.sleep(1000);
LOG.info("block addShadowIndexToCatalog for job: {}", jobId);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
}
}
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
addShadowIndexToCatalog(tbl);
} finally {
tbl.writeUnlock();
}
}
/**
* runPendingJob():
* 1. Create all replicas of all shadow indexes and wait them finished.
* 2. After creating done, add the shadow indexes to catalog, user can not see this
* shadow index, but internal load process will generate data for these indexes.
* 3. Get a new transaction id, then set job's state to WAITING_TXN
*/
@Override
protected void runPendingJob() throws Exception {
Preconditions.checkState(jobState == JobState.PENDING, jobState);
LOG.info("begin to send create replica tasks. job: {}", jobId);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
if (!checkTableStable(db)) {
return;
}
createShadowIndexReplica();
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
this.jobState = JobState.WAITING_TXN;
// write edit log
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("transfer schema change job {} state to {}, watershed txn id: {}",
jobId, this.jobState, watershedTxnId);
}
protected void addShadowIndexToCatalog(OlapTable tbl) {
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (MaterializedIndex shadowIndex : shadowIndexMap.values()) {
Preconditions.checkState(shadowIndex.getState() == IndexState.SHADOW, shadowIndex.getState());
partition.createRollupIndex(shadowIndex);
}
}
for (long shadowIdxId : indexIdMap.keySet()) {
tbl.setIndexMeta(shadowIdxId, indexIdToName.get(shadowIdxId), indexSchemaMap.get(shadowIdxId),
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion,
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash,
indexShortKeyMap.get(shadowIdxId), TStorageType.COLUMN,
tbl.getKeysTypeByIndexId(indexIdMap.get(shadowIdxId)),
indexChange ? indexes : tbl.getIndexMetaByIndexId(indexIdMap.get(shadowIdxId)).getIndexes());
}
tbl.rebuildFullSchema();
}
/**
* runWaitingTxnJob():
* 1. Wait the transactions before the watershedTxnId to be finished.
* 2. If all previous transactions finished, send schema change tasks to BE.
* 3. Change job state to RUNNING.
*/
@Override
protected void runWaitingTxnJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState);
try {
if (!checkFailedPreviousLoadAndAbort()) {
LOG.info("wait transactions before {} to be finished, schema change job: {}", watershedTxnId, jobId);
return;
}
} catch (UserException e) {
throw new AlterCancelException(e.getMessage());
}
LOG.info("previous transactions are all finished, begin to send schema change tasks. job: {}", jobId);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
tbl.readLock();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
String vaultId = tbl.getStorageVaultId();
try {
long expiration = (createTimeMs + timeoutMs) / 1000;
Map<String, Column> indexColumnMap = Maps.newHashMap();
for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
for (Column column : entry.getValue()) {
indexColumnMap.put(column.getName(), column);
}
}
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);
// the schema change task will transform the data before visible
// version(included).
long visibleVersion = partition.getVisibleVersion();
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
long originIdxId = indexIdMap.get(shadowIdxId);
Map<String, Expr> defineExprs = Maps.newHashMap();
List<Column> fullSchema = tbl.getSchemaByIndexId(originIdxId, true);
DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
for (Column column : fullSchema) {
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
destSlotDesc.setIsMaterialized(true);
destSlotDesc.setColumn(column);
destSlotDesc.setIsNullable(column.isAllowNull());
if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PREFIX + column.getName())) {
Column newColumn = indexColumnMap.get(
SchemaChangeHandler.SHADOW_NAME_PREFIX + column.getName());
if (!Objects.equals(newColumn.getType(), column.getType())) {
SlotRef slot = new SlotRef(destSlotDesc);
slot.setCol(column.getName());
defineExprs.put(column.getName(), slot.castTo(newColumn.getType()));
}
}
}
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
List<Column> originSchemaColumns = tbl.getSchemaByIndexId(originIdxId, true);
for (Tablet shadowTablet : shadowIdx.getTablets()) {
long shadowTabletId = shadowTablet.getId();
long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);
List<Replica> shadowReplicas = shadowTablet.getReplicas();
for (Replica shadowReplica : shadowReplicas) {
AlterReplicaTask rollupTask
= new AlterReplicaTask(shadowReplica.getBackendIdWithoutException(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool,
null, expiration, vaultId);
schemaChangeBatchTask.addTask(rollupTask);
}
}
}
} // end for partitions
} catch (AnalysisException e) {
throw new AlterCancelException(e.getMessage());
} finally {
tbl.readUnlock();
}
AgentTaskQueue.addBatchTask(schemaChangeBatchTask);
AgentTaskExecutor.submit(schemaChangeBatchTask);
this.jobState = JobState.RUNNING;
// DO NOT write edit log here, tasks will be sent again if FE restart or master changed.
LOG.info("transfer schema change job {} state to {}", jobId, this.jobState);
}
/**
* runRunningJob()
* 1. Wait all schema change tasks to be finished.
* 2. Check the integrity of the newly created shadow indexes.
* 3. Replace the origin index with shadow index, and set shadow index's state as NORMAL to be visible to user.
* 4. Set job'state as FINISHED.
*/
@Override
protected void runRunningJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.RUNNING, jobState);
// must check if db or table still exist first.
// or if table is dropped, the tasks will never be finished,
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
if (!schemaChangeBatchTask.isFinished()) {
LOG.info("schema change tasks not finished. job: {}", jobId);
List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000);
ensureCloudClusterExist(tasks);
for (AgentTask task : tasks) {
int maxFailedTimes = 0;
if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) {
if (task.getErrorCode() != null && task.getErrorCode()
.equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) {
maxFailedTimes = Config.schema_change_max_retry_time;
}
}
if (task.getFailedTimes() > maxFailedTimes) {
task.setFinished(true);
if (!FeConstants.runningUnitTest) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
LOG.warn("schema change task failed, failedTimes: {}, maxFailedTimes: {}, err: {}",
task.getFailedTimes(), maxFailedTimes, task.getErrorMsg());
List<Long> failedBackends = failedTabletBackends.get(task.getTabletId());
if (failedBackends == null) {
failedBackends = Lists.newArrayList();
failedTabletBackends.put(task.getTabletId(), failedBackends);
}
failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException(
String.format("schema change tasks failed, error reason: %s", task.getErrorMsg()));
}
}
}
}
return;
}
while (DebugPointUtil.isEnable("FE.SchemaChangeJobV2.runRunning.block")) {
try {
Thread.sleep(1000);
LOG.info("block schema change for job: {}", jobId);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
}
}
Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(tableId);
Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
/*
* all tasks are finished. check the integrity.
* we just check whether all new replicas are healthy.
*/
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Map.Entry<Long, List<Long>> entry : failedTabletBackends.entrySet()) {
long tabletId = entry.getKey();
List<Long> failedBackends = entry.getValue();
for (long backendId : failedBackends) {
invertedIndex.getReplica(tabletId, backendId).setBad(true);
}
}
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);
long visibleVersion = partition.getVisibleVersion();
short expectReplicationNum = tbl.getPartitionInfo()
.getReplicaAllocation(partition.getId()).getTotalReplicaNum();
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
MaterializedIndex shadowIdx = entry.getValue();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
List<Replica> replicas = shadowTablet.getReplicas();
int healthyReplicaNum = 0;
for (Replica replica : replicas) {
if (!replica.isBad() && replica.getLastFailedVersion() < 0
&& replica.checkVersionCatchUp(visibleVersion, false)) {
healthyReplicaNum++;
}
}
if ((healthyReplicaNum < expectReplicationNum / 2 + 1) && !FeConstants.runningUnitTest) {
LOG.warn("shadow tablet {} has few healthy replicas: {}, schema change job: {}"
+ " healthyReplicaNum {} expectReplicationNum {}",
shadowTablet.getId(), replicas, jobId, healthyReplicaNum, expectReplicationNum);
throw new AlterCancelException(
"shadow tablet " + shadowTablet.getId() + " has few healthy replicas");
}
} // end for tablets
}
} // end for partitions
commitShadowIndex();
// all partitions are good
onFinished(tbl);
LOG.info("schema change job finished: {}", jobId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();
// Write edit log with table's write lock held, to avoid adding partitions before writing edit log,
// else it will try to transform index in newly added partition while replaying and result in failure.
Env.getCurrentEnv().getEditLog().logAlterJob(this);
pruneMeta();
} finally {
tbl.writeUnlock();
}
postProcessOriginIndex();
// Drop table column stats after schema change finished.
Env.getCurrentEnv().getAnalysisManager().dropStats(tbl, null);
}
private void onFinished(OlapTable tbl) {
// replace the origin index with shadow index, set index state as NORMAL
for (Partition partition : tbl.getPartitions()) {
// drop the origin index from partitions
for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIdxId = entry.getKey();
long originIdxId = entry.getValue();
// get index from catalog, not from 'partitionIdToRollupIndex'.
// because if this alter job is recovered from edit log, index in 'partitionIndexMap'
// is not the same object in catalog. So modification on that index can not reflect to the index
// in catalog.
MaterializedIndex shadowIdx = partition.getIndex(shadowIdxId);
Preconditions.checkNotNull(shadowIdx, shadowIdxId);
MaterializedIndex droppedIdx = null;
if (originIdxId == partition.getBaseIndex().getId()) {
droppedIdx = partition.getBaseIndex();
} else {
droppedIdx = partition.deleteRollupIndex(originIdxId);
}
Preconditions.checkNotNull(droppedIdx, originIdxId + " vs. " + shadowIdxId);
// set replica state
for (Tablet tablet : shadowIdx.getTablets()) {
List<Long> failedBackends = failedTabletBackends.get(tablet.getId());
for (Replica replica : tablet.getReplicas()) {
replica.setState(ReplicaState.NORMAL);
if (failedBackends != null && failedBackends.contains(replica.getBackendIdWithoutException())) {
replica.setBad(true);
}
}
}
partition.visualiseShadowIndex(shadowIdxId, originIdxId == partition.getBaseIndex().getId());
// delete origin replicas
for (Tablet originTablet : droppedIdx.getTablets()) {
Env.getCurrentInvertedIndex().deleteTablet(originTablet.getId());
}
}
}
// update index schema info of each index
for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIdxId = entry.getKey();
long originIdxId = entry.getValue();
String shadowIdxName = tbl.getIndexNameById(shadowIdxId);
String originIdxName = tbl.getIndexNameById(originIdxId);
int maxColUniqueId = tbl.getIndexMetaByIndexId(originIdxId).getMaxColUniqueId();
for (Column column : indexSchemaMap.get(shadowIdxId)) {
if (column.getUniqueId() > maxColUniqueId) {
maxColUniqueId = column.getUniqueId();
}
}
tbl.getIndexMetaByIndexId(shadowIdxId).setMaxColUniqueId(maxColUniqueId);
if (LOG.isDebugEnabled()) {
LOG.debug("originIdxId:{}, shadowIdxId:{}, maxColUniqueId:{}, indexSchema:{}",
originIdxId, shadowIdxId, maxColUniqueId, indexSchemaMap.get(shadowIdxId));
}
tbl.deleteIndexInfo(originIdxName);
// the shadow index name is '__doris_shadow_xxx', rename it to origin name 'xxx'
// this will also remove the prefix of columns
tbl.renameIndexForSchemaChange(shadowIdxName, originIdxName);
tbl.renameColumnNamePrefix(shadowIdxId);
if (originIdxId == tbl.getBaseIndexId()) {
// set base index
tbl.setBaseIndexId(shadowIdxId);
}
}
// rebuild table's full schema
tbl.rebuildFullSchema();
tbl.rebuildDistributionInfo();
// update bloom filter
if (hasBfChange) {
tbl.setBloomFilterInfo(bfColumns, bfFpp);
}
// update index
if (indexChange) {
tbl.setIndexes(indexes);
}
// update row store
if (hasRowStoreChange) {
tbl.setStoreRowColumn(storeRowColumn);
tbl.setRowStoreColumns(rowStoreColumns);
}
// set storage format of table, only set if format is v2
if (storageFormat == TStorageFormat.V2) {
tbl.setStorageFormat(storageFormat);
}
}
/*
* cancelImpl() can be called any time any place.
* We need to clean any possible residual of this job.
*/
@Override
protected synchronized boolean cancelImpl(String errMsg) {
if (jobState.isFinalState()) {
return false;
}
cancelInternal();
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when cancel, table id: {}, job id: {}", tableId, jobId);
jobState = JobState.CANCELLED;
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
onCancel();
pruneMeta();
return true;
}
private void cancelInternal() {
// clear tasks if has
AgentTaskQueue.removeBatchTask(schemaChangeBatchTask, TTaskType.ALTER);
// remove all shadow indexes, and set state to NORMAL
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
tbl.writeLock();
try {
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
MaterializedIndex shadowIdx = entry.getValue();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
invertedIndex.deleteTablet(shadowTablet.getId());
}
partition.deleteRollupIndex(shadowIdx.getId());
}
}
for (String shadowIndexName : indexIdToName.values()) {
tbl.deleteIndexInfo(shadowIndexName);
}
} finally {
tbl.writeUnlock();
}
}
}
}
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished
// and abort it if it is failed.
// If return true, all previous load is finish
protected boolean checkFailedPreviousLoadAndAbort() throws UserException {
List<TransactionState> unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad(
watershedTxnId, dbId, Lists.newArrayList(tableId));
if (Config.enable_abort_txn_by_checking_conflict_txn) {
List<TransactionState> failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns);
for (TransactionState txn : failedTxns) {
Env.getCurrentGlobalTransactionMgr()
.abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change");
}
}
return unFinishedTxns.isEmpty();
}
/**
* Replay job in PENDING state.
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in SchemaChangeHandler.createJob()
*/
private void replayCreateJob(SchemaChangeJobV2 replayedJob) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Cell<Long, Long, MaterializedIndex> cell : partitionIndexMap.cellSet()) {
long partitionId = cell.getRowKey();
long shadowIndexId = cell.getColumnKey();
MaterializedIndex shadowIndex = cell.getValue();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
for (Tablet shadownTablet : shadowIndex.getTablets()) {
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
invertedIndex.addTablet(shadownTablet.getId(), shadowTabletMeta);
for (Replica shadowReplica : shadownTablet.getReplicas()) {
invertedIndex.addReplica(shadownTablet.getId(), shadowReplica);
}
}
}
// set table state
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
} finally {
olapTable.writeUnlock();
}
this.watershedTxnId = replayedJob.watershedTxnId;
jobState = JobState.PENDING;
LOG.info("replay pending schema change job: {}, table id: {}", jobId, tableId);
}
/**
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
private void replayPendingJob(SchemaChangeJobV2 replayedJob) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
addShadowIndexToCatalog(olapTable);
} finally {
olapTable.writeUnlock();
}
// should still be in WAITING_TXN state, so that the alter tasks will be resend again
this.jobState = JobState.WAITING_TXN;
this.watershedTxnId = replayedJob.watershedTxnId;
LOG.info("replay waiting txn schema change job: {} table id: {}", jobId, tableId);
}
/**
* Replay job in FINISHED state.
* Should replay all changes in runRunningJob()
*/
private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
tbl.writeLock();
try {
onFinished(tbl);
} finally {
tbl.writeUnlock();
}
}
}
postProcessOriginIndex();
jobState = JobState.FINISHED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
LOG.info("replay finished schema change job: {} table id: {}", jobId, tableId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when replay finished, table id: {}, job id: {}", tableId, jobId);
pruneMeta();
}
/**
* Replay job in CANCELLED state.
*/
private void replayCancelled(SchemaChangeJobV2 replayedJob) {
cancelInternal();
// try best to drop shadow index
onCancel();
this.jobState = JobState.CANCELLED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
this.errMsg = replayedJob.errMsg;
LOG.info("replay cancelled schema change job: {}", jobId);
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL when replay cancelled, table id: {}, job id: {}", tableId, jobId);
pruneMeta();
}
@Override
public void replay(AlterJobV2 replayedJob) {
try {
SchemaChangeJobV2 replayedSchemaChangeJob = (SchemaChangeJobV2) replayedJob;
switch (replayedJob.jobState) {
case PENDING:
replayCreateJob(replayedSchemaChangeJob);
break;
case WAITING_TXN:
replayPendingJob(replayedSchemaChangeJob);
break;
case FINISHED:
replayRunningJob(replayedSchemaChangeJob);
break;
case CANCELLED:
replayCancelled(replayedSchemaChangeJob);
break;
default:
break;
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] replay schema change job failed {}", replayedJob.getJobId(), e);
}
}
@Override
protected void getInfo(List<List<Comparable>> infos) {
// calc progress first. all index share the same process
String progress = FeConstants.null_string;
if (jobState == JobState.RUNNING && schemaChangeBatchTask.getTaskNum() > 0) {
progress = schemaChangeBatchTask.getFinishedTaskNum() + "/" + schemaChangeBatchTask.getTaskNum();
}
// one line for one shadow index
for (Map.Entry<Long, Long> entry : indexIdMap.entrySet()) {
long shadowIndexId = entry.getKey();
List<Comparable> info = Lists.newArrayList();
info.add(jobId);
info.add(tableName);
info.add(TimeUtils.longToTimeStringWithms(createTimeMs));
info.add(TimeUtils.longToTimeStringWithms(finishedTimeMs));
// only show the origin index name
info.add(indexIdToName.get(shadowIndexId).substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
info.add(shadowIndexId);
info.add(entry.getValue());
info.add(indexSchemaVersionAndHashMap.get(shadowIndexId).toString());
info.add(watershedTxnId);
info.add(jobState.name());
info.add(errMsg);
info.add(progress);
info.add(timeoutMs / 1000);
infos.add(info);
}
}
public Map<Long, Long> getIndexIdMap() {
return indexIdMap;
}
public List<List<String>> getUnfinishedTasks(int limit) {
List<List<String>> taskInfos = Lists.newArrayList();
if (jobState == JobState.RUNNING) {
List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(limit);
for (AgentTask agentTask : tasks) {
AlterReplicaTask alterTask = (AlterReplicaTask) agentTask;
List<String> info = Lists.newArrayList();
info.add(String.valueOf(alterTask.getBackendId()));
info.add(String.valueOf(alterTask.getBaseTabletId()));
info.add(String.valueOf(alterTask.getSignature()));
taskInfos.add(info);
}
}
return taskInfos;
}
private void changeTableState(long dbId, long tableId, OlapTableState olapTableState) {
try {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLockOrMetaException();
try {
if (olapTable.getState() == olapTableState) {
return;
} else if (olapTable.getState() == OlapTableState.SCHEMA_CHANGE) {
olapTable.setState(olapTableState);
}
} finally {
olapTable.writeUnlock();
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] changing table status failed after schema change job done", e);
}
}
// commit shadowIndex after the job is done in cloud mode
protected void commitShadowIndex() throws AlterCancelException {}
// try best to drop shadow index, when job is cancelled in cloud mode
protected void onCancel() {}
// try best to drop origin index in cloud mode
protected void postProcessOriginIndex() {}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
Text.writeString(out, json);
}
@Override
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}