RollupJobV2.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.MVColumnItem;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Version 2 of RollupJob.
* This is for replacing the old RollupJob
* https://github.com/apache/doris/issues/1429
*/
public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(RollupJobV2.class);
// partition id -> (rollup tablet id -> base tablet id)
@SerializedName(value = "partitionIdToBaseRollupTabletIdMap")
protected Map<Long, Map<Long, Long>> partitionIdToBaseRollupTabletIdMap = Maps.newHashMap();
@SerializedName(value = "partitionIdToRollupIndex")
protected Map<Long, MaterializedIndex> partitionIdToRollupIndex = Maps.newHashMap();
// rollup and base schema info
@SerializedName(value = "baseIndexId")
protected long baseIndexId;
@SerializedName(value = "rollupIndexId")
protected long rollupIndexId;
@SerializedName(value = "baseIndexName")
protected String baseIndexName;
@SerializedName(value = "rollupIndexName")
protected String rollupIndexName;
@SerializedName(value = "rollupSchema")
protected List<Column> rollupSchema = Lists.newArrayList();
@SerializedName(value = "whereColumn")
protected Column whereColumn;
@SerializedName(value = "baseSchemaHash")
protected int baseSchemaHash;
@SerializedName(value = "rollupSchemaHash")
protected int rollupSchemaHash;
@SerializedName(value = "rollupKeysType")
protected KeysType rollupKeysType;
@SerializedName(value = "rollupShortKeyColumnCount")
protected short rollupShortKeyColumnCount;
@SerializedName(value = "origStmt")
protected OriginStatement origStmt;
// optional
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
// save all create rollup tasks
private AgentBatchTask rollupBatchTask = new AgentBatchTask();
private Analyzer analyzer;
protected RollupJobV2() {
super(JobType.ROLLUP);
}
// Don't call it directly, use AlterJobV2Factory to replace
public RollupJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs,
long baseIndexId,
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
Column whereColumn,
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
short rollupShortKeyColumnCount,
OriginStatement origStmt) throws AnalysisException {
super(rawSql, jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
this.baseIndexId = baseIndexId;
this.rollupIndexId = rollupIndexId;
this.baseIndexName = baseIndexName;
this.rollupIndexName = rollupIndexName;
this.rollupSchema = rollupSchema;
this.whereColumn = whereColumn;
this.baseSchemaHash = baseSchemaHash;
this.rollupSchemaHash = rollupSchemaHash;
this.rollupKeysType = rollupKeysType;
this.rollupShortKeyColumnCount = rollupShortKeyColumnCount;
this.origStmt = origStmt;
initAnalyzer();
}
public void addTabletIdMap(long partitionId, long rollupTabletId, long baseTabletId) {
Map<Long, Long> tabletIdMap = partitionIdToBaseRollupTabletIdMap
.computeIfAbsent(partitionId, k -> Maps.newHashMap());
tabletIdMap.put(rollupTabletId, baseTabletId);
}
public void addMVIndex(long partitionId, MaterializedIndex mvIndex) {
this.partitionIdToRollupIndex.put(partitionId, mvIndex);
}
public void setStorageFormat(TStorageFormat storageFormat) {
this.storageFormat = storageFormat;
}
public long getRollupIndexId() {
return rollupIndexId;
}
public String getRollupIndexName() {
return rollupIndexName;
}
public long getBaseIndexId() {
return baseIndexId;
}
public String getBaseIndexName() {
return baseIndexName;
}
protected void initAnalyzer() throws AnalysisException {
ConnectContext connectContext = new ConnectContext();
Database db;
try {
db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
} catch (MetaNotFoundException e) {
throw new AnalysisException("error happens when parsing create materialized view stmt: " + origStmt, e);
}
connectContext.setDatabase(db.getFullName());
analyzer = new Analyzer(Env.getCurrentEnv(), connectContext);
}
protected void createRollupReplica() throws AlterCancelException {
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
// 1. create rollup replicas
AgentBatchTask batchTask = new AgentBatchTask();
// count total replica num
int totalReplicaNum = 0;
for (MaterializedIndex rollupIdx : partitionIdToRollupIndex.values()) {
for (Tablet tablet : rollupIdx.getTablets()) {
totalReplicaNum += tablet.getReplicas().size();
}
}
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum);
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
tbl.readLock();
try {
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
TStorageMedium storageMedium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId);
MaterializedIndex rollupIndex = entry.getValue();
Map<Long, Long> tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
long rollupTabletId = rollupTablet.getId();
List<Replica> rollupReplicas = rollupTablet.getReplicas();
for (Replica rollupReplica : rollupReplicas) {
long backendId = rollupReplica.getBackendIdWithoutException();
long rollupReplicaId = rollupReplica.getId();
Preconditions.checkNotNull(tabletIdMap.get(rollupTabletId)); // baseTabletId
countDownLatch.addMark(backendId, rollupTabletId);
// create replica with version 1.
// version will be updated by following load process, or when rollup task finished.
CreateReplicaTask createReplicaTask = new CreateReplicaTask(
backendId, dbId, tableId, partitionId, rollupIndexId, rollupTabletId,
rollupReplicaId, rollupShortKeyColumnCount, rollupSchemaHash,
Partition.PARTITION_INIT_VERSION,
rollupKeysType, TStorageType.COLUMN, storageMedium,
rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch,
null, // do not copy indexes of base tablet to ROLLUP tablet
tbl.isInMemory(),
tabletType,
null,
tbl.getCompressionType(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(),
tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
tbl.getCompactionPolicy(),
tbl.getTimeSeriesCompactionGoalSizeMbytes(),
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig,
tbl.getRowStoreColumnsUniqueIds(tbl.getTableProperty().getCopiedRowStoreColumns()),
objectPool,
tbl.rowStorePageSize(),
tbl.variantEnableFlattenNested(),
tbl.storagePageSize());
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
}
// rollup replica does not need to set mow cluster keys
batchTask.addTask(createReplicaTask);
} // end for rollupReplicas
} // end for rollupTablets
}
} finally {
tbl.readUnlock();
}
if (!FeConstants.runningUnitTest) {
// send all tasks and wait them finished
AgentTaskQueue.addBatchTask(batchTask);
AgentTaskExecutor.submit(batchTask);
long timeout = DbUtil.getCreateReplicasTimeoutMs(totalReplicaNum);
boolean ok = false;
try {
ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
ok = false;
}
if (!ok || !countDownLatch.getStatus().ok()) {
// create rollup replicas failed. just cancel the job
// clear tasks and show the failed replicas to user
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
String errMsg = null;
if (!countDownLatch.getStatus().ok()) {
errMsg = countDownLatch.getStatus().getErrorMsg();
} else {
// only show at most 3 results
List<String> subList = countDownLatch.getLeftMarks().stream().limit(3)
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
.collect(Collectors.toList());
errMsg = "Error replicas:" + Joiner.on(", ").join(subList);
}
LOG.warn("failed to create rollup replicas for job: {}, {}", jobId, errMsg);
throw new AlterCancelException("Create rollup replicas failed. Error: " + errMsg);
}
}
// create all rollup replicas success.
// add rollup index to catalog
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
addRollupIndexToCatalog(tbl);
} finally {
tbl.writeUnlock();
}
}
/**
* runPendingJob():
* 1. Create all rollup replicas and wait them finished.
* 2. After creating done, add this shadow rollup index to catalog, user can not see this
* rollup, but internal load process will generate data for this rollup index.
* 3. Get a new transaction id, then set job's state to WAITING_TXN
*/
@Override
protected void runPendingJob() throws Exception {
Preconditions.checkState(jobState == JobState.PENDING, jobState);
LOG.info("begin to send create rollup replica tasks. job: {}", jobId);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
if (!checkTableStable(db)) {
return;
}
createRollupReplica();
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
this.jobState = JobState.WAITING_TXN;
// write edit log
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("transfer rollup job {} state to {}, watershed txn id: {}", jobId, this.jobState, watershedTxnId);
}
protected void addRollupIndexToCatalog(OlapTable tbl) {
for (Partition partition : tbl.getPartitions()) {
long partitionId = partition.getId();
MaterializedIndex rollupIndex = this.partitionIdToRollupIndex.get(partitionId);
Preconditions.checkNotNull(rollupIndex);
Preconditions.checkState(rollupIndex.getState() == IndexState.SHADOW, rollupIndex.getState());
partition.createRollupIndex(rollupIndex);
}
tbl.setIndexMeta(rollupIndexId, rollupIndexName, rollupSchema, 0 /* init schema version */,
rollupSchemaHash, rollupShortKeyColumnCount, TStorageType.COLUMN,
rollupKeysType, origStmt, analyzer != null ? new Analyzer(analyzer) : analyzer, null);
tbl.rebuildFullSchema();
}
/**
* runWaitingTxnJob():
* 1. Wait the transactions before the watershedTxnId to be finished.
* 2. If all previous transactions finished, send create rollup tasks to BE.
* 3. Change job state to RUNNING.
*/
@Override
protected void runWaitingTxnJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState);
try {
if (!checkFailedPreviousLoadAndAbort()) {
LOG.info("wait transactions before {} to be finished, rollup job: {}", watershedTxnId, jobId);
return;
}
} catch (UserException e) {
throw new AlterCancelException(e.getMessage());
}
LOG.info("previous transactions are all finished, begin to send rollup tasks. job: {}", jobId);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Databasee " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
tbl.readLock();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
String vaultId = tbl.getStorageVaultId();
try {
long expiration = (createTimeMs + timeoutMs) / 1000;
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);
// the rollup task will transform the data before visible version(included).
long visibleVersion = partition.getVisibleVersion();
Map<String, Expr> defineExprs = Maps.newHashMap();
MaterializedIndex rollupIndex = entry.getValue();
Map<Long, Long> tabletIdMap = this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
long rollupTabletId = rollupTablet.getId();
long baseTabletId = tabletIdMap.get(rollupTabletId);
DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
Map<String, SlotDescriptor> descMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
List<Column> rollupColumns = new ArrayList<Column>();
Set<String> columnNames = new HashSet<String>();
for (Column column : tbl.getBaseSchema()) {
rollupColumns.add(column);
columnNames.add(column.getName());
}
for (Column column : rollupSchema) {
if (columnNames.contains(column.getName())) {
continue;
}
rollupColumns.add(column);
}
Expr whereClause = null;
if (whereColumn != null) {
whereClause = whereColumn.getDefineExpr();
rollupColumns.add(whereColumn);
}
for (Column column : rollupColumns) {
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
destSlotDesc.setIsMaterialized(true);
destSlotDesc.setColumn(column);
destSlotDesc.setIsNullable(column.isAllowNull());
descMap.put(column.getName(), destSlotDesc);
}
for (Column column : rollupColumns) {
if (column.getDefineExpr() != null) {
if (whereColumn != column) {
defineExprs.put(column.getName(), column.getDefineExpr());
}
List<SlotRef> slots = new ArrayList<>();
column.getDefineExpr().collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
SlotDescriptor slotDesc = descMap.get(slot.getColumnName());
if (slotDesc == null) {
throw new AlterCancelException("slotDesc is null, slot=" + slot.getColumnName()
+ ", column=" + column.getName());
}
slot.setDesc(slotDesc);
}
}
}
List<Replica> rollupReplicas = rollupTablet.getReplicas();
for (Replica rollupReplica : rollupReplicas) {
AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendIdWithoutException(),
dbId, tableId, partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true),
objectPool, whereClause, expiration, vaultId);
rollupBatchTask.addTask(rollupTask);
}
}
}
} finally {
tbl.readUnlock();
}
AgentTaskQueue.addBatchTask(rollupBatchTask);
AgentTaskExecutor.submit(rollupBatchTask);
this.jobState = JobState.RUNNING;
// DO NOT write edit log here, tasks will be send again if FE restart or master changed.
LOG.info("transfer rollup job {} state to {}", jobId, this.jobState);
}
/**
* runRunningJob()
* 1. Wait all create rollup tasks to be finished.
* 2. Check the integrity of the newly created rollup index.
* 3. Set rollup index's state to NORMAL to let it visible to query.
* 4. Set job'state as FINISHED.
*/
@Override
protected void runRunningJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.RUNNING, jobState);
// must check if db or table still exist first.
// or if table is dropped, the tasks will never be finished,
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Databasee " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
LOG.debug("jobId:{}, cloudClusterName:{}", jobId, cloudClusterName);
if (!rollupBatchTask.isFinished()) {
LOG.info("rollup tasks not finished. job: {}", jobId);
List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
ensureCloudClusterExist(tasks);
for (AgentTask task : tasks) {
int maxFailedTimes = 0;
if (Config.isCloudMode() && Config.enable_schema_change_retry_in_cloud_mode) {
if (task.getErrorCode() != null && task.getErrorCode()
.equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) {
maxFailedTimes = Config.schema_change_max_retry_time;
}
}
if (task.getFailedTimes() > maxFailedTimes) {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
LOG.warn("rollup task failed, failedTimes: {}, maxFailedTimes: {}, err: {}",
task.getFailedTimes(), maxFailedTimes, task.getErrorMsg());
List<Long> failedBackends = failedTabletBackends.get(task.getTabletId());
if (failedBackends == null) {
failedBackends = Lists.newArrayList();
failedTabletBackends.put(task.getTabletId(), failedBackends);
}
failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("rollup tasks failed on same tablet reach threshold "
+ failedTaskCount + ", reason=" + task.getErrorMsg());
}
}
}
return;
}
/*
* all tasks are finished. check the integrity.
* we just check whether all rollup replicas are healthy.
*/
tbl.writeLockOrAlterCancelException();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Map.Entry<Long, List<Long>> entry : failedTabletBackends.entrySet()) {
long tabletId = entry.getKey();
List<Long> failedBackends = entry.getValue();
for (long backendId : failedBackends) {
invertedIndex.getReplica(tabletId, backendId).setBad(true);
}
}
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
continue;
}
long visibleVersion = partition.getVisibleVersion();
short expectReplicationNum = tbl.getPartitionInfo().getReplicaAllocation(
partitionId).getTotalReplicaNum();
MaterializedIndex rollupIndex = entry.getValue();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
List<Replica> replicas = rollupTablet.getReplicas();
int healthyReplicaNum = 0;
for (Replica replica : replicas) {
if (!replica.isBad() && replica.getLastFailedVersion() < 0
&& replica.checkVersionCatchUp(visibleVersion, false)) {
healthyReplicaNum++;
}
}
if (healthyReplicaNum < expectReplicationNum / 2 + 1) {
LOG.warn("rollup tablet {} has few healthy replicas: {}, rollup job: {}",
rollupTablet.getId(), replicas, jobId);
throw new AlterCancelException("rollup tablet " + rollupTablet.getId()
+ " has few healthy replicas");
}
} // end for tablets
} // end for partitions
onCreateRollupReplicaDone();
onFinished(tbl);
} finally {
tbl.writeUnlock();
}
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logAlterJob(this);
LOG.info("rollup job finished: {}", jobId);
}
private void onFinished(OlapTable tbl) {
for (Partition partition : tbl.getPartitions()) {
MaterializedIndex rollupIndex = partition.getIndex(rollupIndexId);
Preconditions.checkNotNull(rollupIndex, rollupIndexId);
for (Tablet tablet : rollupIndex.getTablets()) {
List<Long> failedBackends = failedTabletBackends.get(tablet.getId());
for (Replica replica : tablet.getReplicas()) {
replica.setState(ReplicaState.NORMAL);
if (failedBackends != null && failedBackends.contains(replica.getBackendIdWithoutException())) {
replica.setBad(true);
}
}
}
partition.visualiseShadowIndex(rollupIndexId, false);
}
//update max column unique id
int maxColUniqueId = tbl.getIndexMetaByIndexId(rollupIndexId).getMaxColUniqueId();
for (Column column : tbl.getIndexMetaByIndexId(rollupIndexId).getSchema(true)) {
if (column.getUniqueId() > maxColUniqueId) {
maxColUniqueId = column.getUniqueId();
}
}
tbl.getIndexMetaByIndexId(rollupIndexId).setMaxColUniqueId(maxColUniqueId);
if (LOG.isDebugEnabled()) {
LOG.debug("rollupIndexId:{}, maxColUniqueId:{}, indexIdToSchema:{}", rollupIndexId, maxColUniqueId,
tbl.getIndexIdToSchema(true));
}
tbl.rebuildFullSchema();
}
/**
* cancelImpl() can be called any time any place.
* We need to clean any possible residual of this job.
*/
@Override
protected boolean cancelImpl(String errMsg) {
if (jobState.isFinalState()) {
return false;
}
cancelInternal();
jobState = JobState.CANCELLED;
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logAlterJob(this);
// try best to drop roll index, when job is cancelled
onCancel();
LOG.info("cancel {} job {}, err: {}", this.type, jobId, errMsg);
return true;
}
private void cancelInternal() {
// clear tasks if has
AgentTaskQueue.removeBatchTask(rollupBatchTask, TTaskType.ALTER);
// remove all rollup indexes, and set state to NORMAL
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
tbl.writeLock();
try {
for (Long partitionId : partitionIdToRollupIndex.keySet()) {
MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(rollupTablet.getId());
}
Partition partition = tbl.getPartition(partitionId);
partition.deleteRollupIndex(rollupIndexId);
}
tbl.deleteIndexInfo(rollupIndexName);
} finally {
tbl.writeUnlock();
}
}
}
}
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are failed
// and abort it if it is failed.
// If return true, all previous load is finish
protected boolean checkFailedPreviousLoadAndAbort() throws UserException {
List<TransactionState> unFinishedTxns = Env.getCurrentGlobalTransactionMgr().getUnFinishedPreviousLoad(
watershedTxnId, dbId, Lists.newArrayList(tableId));
if (Config.enable_abort_txn_by_checking_conflict_txn) {
List<TransactionState> failedTxns = GlobalTransactionMgr.checkFailedTxns(unFinishedTxns);
for (TransactionState txn : failedTxns) {
Env.getCurrentGlobalTransactionMgr()
.abortTransaction(txn.getDbId(), txn.getTransactionId(), "Cancel by schema change");
}
}
return unFinishedTxns.isEmpty();
}
/**
* Replay job in PENDING state.
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in RollupHander.processAddRollup()
*/
private void replayCreateJob(RollupJobV2 replayedJob) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLock();
try {
addTabletToInvertedIndex(olapTable);
} finally {
olapTable.writeUnlock();
}
// to make sure that this job will run runPendingJob() again to create the rollup replicas
this.jobState = JobState.PENDING;
this.watershedTxnId = replayedJob.watershedTxnId;
LOG.info("replay pending rollup job: {}", jobId);
}
private void addTabletToInvertedIndex(OlapTable tbl) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
// add all rollup replicas to tablet inverted index
for (Long partitionId : partitionIdToRollupIndex.keySet()) {
MaterializedIndex rollupIndex = partitionIdToRollupIndex.get(partitionId);
TStorageMedium medium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
for (Tablet rollupTablet : rollupIndex.getTablets()) {
TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId, partitionId, rollupIndexId,
rollupSchemaHash, medium);
invertedIndex.addTablet(rollupTablet.getId(), rollupTabletMeta);
for (Replica rollupReplica : rollupTablet.getReplicas()) {
invertedIndex.addReplica(rollupTablet.getId(), rollupReplica);
}
}
}
}
/**
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
private void replayPendingJob(RollupJobV2 replayedJob) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLock();
try {
addRollupIndexToCatalog(olapTable);
} finally {
olapTable.writeUnlock();
}
// should still be in WAITING_TXN state, so that the alter tasks will be resend again
this.jobState = JobState.WAITING_TXN;
this.watershedTxnId = replayedJob.watershedTxnId;
LOG.info("replay waiting txn rollup job: {}", jobId);
}
/**
* Replay job in FINISHED state.
* Should replay all changes in runRuningJob()
*/
private void replayRunningJob(RollupJobV2 replayedJob) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTableNullable(tableId);
if (tbl != null) {
tbl.writeLock();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
onFinished(tbl);
} finally {
tbl.writeUnlock();
}
}
}
this.jobState = JobState.FINISHED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
LOG.info("replay finished rollup job: {}", jobId);
}
/**
* Replay job in CANCELLED state.
*/
private void replayCancelled(RollupJobV2 replayedJob) {
cancelInternal();
// try best to drop roll index, when job is cancelled
onCancel();
this.jobState = JobState.CANCELLED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
this.errMsg = replayedJob.errMsg;
LOG.info("replay cancelled rollup job: {}", jobId);
}
@Override
public void replay(AlterJobV2 replayedJob) {
try {
RollupJobV2 replayedRollupJob = (RollupJobV2) replayedJob;
switch (replayedJob.jobState) {
case PENDING:
replayCreateJob(replayedRollupJob);
break;
case WAITING_TXN:
replayPendingJob(replayedRollupJob);
break;
case FINISHED:
replayRunningJob(replayedRollupJob);
break;
case CANCELLED:
replayCancelled(replayedRollupJob);
break;
default:
break;
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] replay rollup job failed {}", replayedJob.getJobId(), e);
}
}
@Override
protected void getInfo(List<List<Comparable>> infos) {
List<Comparable> info = Lists.newArrayList();
info.add(jobId);
info.add(tableName);
info.add(TimeUtils.longToTimeString(createTimeMs));
info.add(TimeUtils.longToTimeString(finishedTimeMs));
info.add(baseIndexName);
info.add(rollupIndexName);
info.add(rollupIndexId);
info.add(watershedTxnId);
info.add(jobState.name());
info.add(errMsg);
// progress
if (jobState == JobState.RUNNING && rollupBatchTask.getTaskNum() > 0) {
info.add(rollupBatchTask.getFinishedTaskNum() + "/" + rollupBatchTask.getTaskNum());
} else {
info.add(FeConstants.null_string);
}
info.add(timeoutMs / 1000);
infos.add(info);
}
public List<List<String>> getUnfinishedTasks(int limit) {
List<List<String>> taskInfos = Lists.newArrayList();
if (jobState == JobState.RUNNING) {
List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(limit);
for (AgentTask agentTask : tasks) {
AlterReplicaTask rollupTask = (AlterReplicaTask) agentTask;
List<String> info = Lists.newArrayList();
info.add(String.valueOf(rollupTask.getBackendId()));
info.add(String.valueOf(rollupTask.getBaseTabletId()));
info.add(String.valueOf(rollupTask.getSignature()));
taskInfos.add(info);
}
}
return taskInfos;
}
public Map<Long, MaterializedIndex> getPartitionIdToRollupIndex() {
return partitionIdToRollupIndex;
}
public void setJobState(JobState jobState) {
this.jobState = jobState;
}
private void setColumnsDefineExpr(List<MVColumnItem> mvColumnItemList) {
for (MVColumnItem mvColumnItem : mvColumnItemList) {
for (Column column : rollupSchema) {
if (column.getName().equals(mvColumnItem.getName())) {
column.setDefineExpr(mvColumnItem.getDefineExpr());
break;
}
}
}
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
Text.writeString(out, json);
}
@Override
public void gsonPostProcess() throws IOException {
// analyze define stmt
if (origStmt == null) {
return;
}
if (jobState != JobState.PENDING) {
return;
}
// parse the define stmt to schema
SqlParser parser = new SqlParser(new SqlScanner(
new StringReader(origStmt.originStmt), SqlModeHelper.MODE_DEFAULT));
CreateMaterializedViewStmt stmt = null;
try {
initAnalyzer();
stmt = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
stmt.setIsReplay(true);
stmt.analyze(analyzer);
} catch (Exception e) {
// Under normal circumstances, the stmt will not fail to analyze.
// In some cases (such as drop table force), analyze may fail because cancel is
// not included in the checkpoint.
jobState = JobState.CANCELLED;
LOG.warn("error happens when parsing create materialized view stmt: " + stmt, e);
return;
}
setColumnsDefineExpr(stmt.getMVColumnItemList());
if (whereColumn != null) {
whereColumn.setDefineExpr(stmt.getWhereClause());
}
}
protected void onCreateRollupReplicaDone() throws AlterCancelException {}
// try best to drop roll index, when job is cancelled
protected void onCancel() {}
@Override
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}