IndexChangeJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
public class IndexChangeJob implements Writable {
private static final Logger LOG = LogManager.getLogger(IndexChangeJob.class);
static final int MAX_FAILED_NUM = 10;
static final int MIN_FAILED_NUM = 3;
public enum JobState {
// CHECKSTYLE OFF
WAITING_TXN, // waiting for previous txns to be finished
// CHECKSTYLE ON
RUNNING, // waiting for inverted index tasks finished.
FINISHED, // job is done
CANCELLED; // job is cancelled
public boolean isFinalState() {
return this == JobState.FINISHED || this == JobState.CANCELLED;
}
}
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "jobState")
private JobState jobState;
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "tableName")
private String tableName;
@SerializedName(value = "partitionId")
private long partitionId;
@SerializedName(value = "partitionName")
private String partitionName;
@SerializedName(value = "errMsg")
private String errMsg = "";
@SerializedName(value = "createTimeMs")
private long createTimeMs = -1;
@SerializedName(value = "finishedTimeMs")
private long finishedTimeMs = -1;
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
@SerializedName(value = "isDropOp")
private boolean isDropOp = false;
@SerializedName(value = "alterInvertedIndexes")
private List<Index> alterInvertedIndexes = null;
@SerializedName(value = "originIndexId")
private long originIndexId;
@SerializedName(value = "invertedIndexBatchTask")
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;
public IndexChangeJob() {
this.jobId = -1;
this.dbId = -1;
this.tableId = -1;
this.tableName = "";
this.createTimeMs = System.currentTimeMillis();
this.jobState = JobState.WAITING_TXN;
}
public IndexChangeJob(long jobId, long dbId, long tableId, String tableName, long timeoutMs) throws Exception {
this.jobId = jobId;
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.createTimeMs = System.currentTimeMillis();
this.jobState = JobState.WAITING_TXN;
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr().getNextTransactionId();
this.timeoutMs = timeoutMs;
}
public long getJobId() {
return jobId;
}
public long getOriginIndexId() {
return originIndexId;
}
public JobState getJobState() {
return jobState;
}
public void setJobState(JobState jobState) {
this.jobState = jobState;
}
public void setPartitionId(long partitionId) {
this.partitionId = partitionId;
}
public void setPartitionName(String partitionName) {
this.partitionName = partitionName;
}
public void setOriginIndexId(long originIndexId) {
this.originIndexId = originIndexId;
}
public void setAlterInvertedIndexInfo(boolean isDropOp, List<Index> alterInvertedIndexes) {
this.isDropOp = isDropOp;
this.alterInvertedIndexes = alterInvertedIndexes;
}
public boolean hasSameAlterInvertedIndex(boolean isDropOp, List<Index> inputAlterInvertedIndexes) {
if (this.isDropOp == isDropOp) {
for (Index inputIndex : inputAlterInvertedIndexes) {
for (Index existIndex : this.alterInvertedIndexes) {
if (inputIndex.getIndexId() == existIndex.getIndexId()) {
return true;
}
}
}
}
return false;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
public String getTableName() {
return tableName;
}
public String getPartitionName() {
return partitionName;
}
public boolean isExpire() {
return isDone() && (System.currentTimeMillis() - finishedTimeMs) / 1000 > Config.history_job_keep_max_second;
}
public boolean isDone() {
return jobState.isFinalState();
}
public long getFinishedTimeMs() {
return finishedTimeMs;
}
public void setFinishedTimeMs(long finishedTimeMs) {
this.finishedTimeMs = finishedTimeMs;
}
public boolean isTimeout() {
return System.currentTimeMillis() - createTimeMs > timeoutMs;
}
/**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
* Only these 2 methods can be visited by different thread(internal working thread and user connection thread)
* So using 'synchronized' to make sure only one thread can run the job at one time.
*
* lock order:
* synchronized
* db lock
*/
public synchronized void run() {
if (isTimeout()) {
cancelImpl("Timeout");
return;
}
try {
switch (jobState) {
case WAITING_TXN:
runWaitingTxnJob();
break;
case RUNNING:
runRunningJob();
break;
default:
break;
}
} catch (AlterCancelException e) {
cancelImpl(e.getMessage());
}
}
public final synchronized boolean cancel(String errMsg) {
return cancelImpl(errMsg);
}
/**
* should be called before executing the job.
* return false if table is not stable.
*/
protected boolean checkTableStable(OlapTable tbl) throws AlterCancelException {
tbl.writeLockOrAlterCancelException();
try {
boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(),
Env.getCurrentEnv().getTabletScheduler());
if (!isStable) {
errMsg = "table is unstable";
LOG.warn("wait table {} to be stable before doing index change job", tableId);
return false;
} else {
// table is stable
LOG.info("table {} is stable, start index change job {}", tableId, jobId);
errMsg = "";
return true;
}
} finally {
tbl.writeUnlock();
}
}
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() throws AnalysisException {
return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
watershedTxnId, dbId, Lists.newArrayList(tableId));
}
protected void runWaitingTxnJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.WAITING_TXN, jobState);
try {
if (!isPreviousLoadFinished()) {
LOG.info("wait transactions before {} to be finished, inverted index job: {}", watershedTxnId, jobId);
return;
}
} catch (AnalysisException e) {
throw new AlterCancelException(e.getMessage());
}
LOG.info("previous transactions are all finished, begin to send build or delete inverted index file tasks."
+ "job: {}, is delete: {}", jobId, isDropOp);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable olapTable;
try {
olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
if (!checkTableStable(olapTable)) {
return;
}
olapTable.readLock();
try {
List<Column> originSchemaColumns = olapTable.getSchemaByIndexId(originIndexId, true);
for (Column col : originSchemaColumns) {
TColumn tColumn = col.toThrift();
col.setIndexFlag(tColumn, olapTable);
}
int originSchemaHash = olapTable.getSchemaHashByIndexId(originIndexId);
Partition partition = olapTable.getPartition(partitionId);
MaterializedIndex origIdx = partition.getIndex(originIndexId);
for (Tablet originTablet : origIdx.getTablets()) {
long taskSignature = Env.getCurrentEnv().getNextId();
long originTabletId = originTablet.getId();
List<Replica> originReplicas = originTablet.getReplicas();
for (Replica originReplica : originReplicas) {
if (originReplica.getBackendIdWithoutException() < 0) {
LOG.warn("replica:{}, backendId: {}", originReplica,
originReplica.getBackendIdWithoutException());
throw new AlterCancelException("originReplica:" + originReplica.getId()
+ " backendId < 0");
}
AlterInvertedIndexTask alterInvertedIndexTask = new AlterInvertedIndexTask(
originReplica.getBackendIdWithoutException(), db.getId(), olapTable.getId(),
partitionId, originIndexId, originTabletId,
originSchemaHash, olapTable.getIndexes(),
alterInvertedIndexes, originSchemaColumns,
isDropOp, taskSignature, jobId);
invertedIndexBatchTask.addTask(alterInvertedIndexTask);
}
} // end for tablet
LOG.info("invertedIndexBatchTask:{}", invertedIndexBatchTask);
AgentTaskQueue.addBatchTask(invertedIndexBatchTask);
if (!FeConstants.runningUnitTest) {
AgentTaskExecutor.submit(invertedIndexBatchTask);
}
} finally {
olapTable.readUnlock();
}
this.jobState = JobState.RUNNING;
// DO NOT write edit log here, tasks will be sent again if FE restart or master changed.
LOG.info("transfer inverted index job {} state to {}", jobId, this.jobState);
}
protected void runRunningJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.RUNNING, jobState);
// must check if db or table still exist first.
// or if table is dropped, the tasks will never be finished,
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
try {
db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
if (!invertedIndexBatchTask.isFinished()) {
LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId);
List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() >= MIN_FAILED_NUM) {
LOG.warn("alter inverted index task failed: " + task.getErrorMsg());
// If error is obtaining lock failed.
// we should do more tries.
if (task.getErrorCode().equals(TStatusCode.OBTAIN_LOCK_FAILED)) {
if (task.getFailedTimes() < MAX_FAILED_NUM) {
continue;
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MAX_FAILED_NUM + ", error: " + task.getErrorMsg());
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MIN_FAILED_NUM + ", error: " + task.getErrorMsg());
}
}
return;
}
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("inverted index job finished: {}", jobId);
}
/**
* cancelImpl() can be called any time any place.
* We need to clean any possible residual of this job.
*/
protected boolean cancelImpl(String errMsg) {
if (jobState.isFinalState()) {
return false;
}
cancelInternal();
jobState = JobState.CANCELLED;
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
return true;
}
private void cancelInternal() {
// clear tasks if has
AgentTaskQueue.removeBatchTask(invertedIndexBatchTask, TTaskType.ALTER_INVERTED_INDEX);
// TODO maybe delete already build index files
}
public void replay(IndexChangeJob replayedJob) {
try {
IndexChangeJob replayedIndexChangeJob = (IndexChangeJob) replayedJob;
switch (replayedJob.jobState) {
case WAITING_TXN:
replayCreateJob(replayedIndexChangeJob);
break;
case FINISHED:
replayRunningJob(replayedIndexChangeJob);
break;
case CANCELLED:
replayCancelled(replayedIndexChangeJob);
break;
default:
break;
}
} catch (MetaNotFoundException e) {
LOG.warn("[INCONSISTENT META] replay inverted index job failed {}", replayedJob.getJobId(), e);
}
}
private void replayCreateJob(IndexChangeJob replayedJob) throws MetaNotFoundException {
// do nothing, resend inverted index task to be
this.watershedTxnId = replayedJob.watershedTxnId;
this.jobState = JobState.WAITING_TXN;
LOG.info("replay waiting_txn inverted index job: {}, table id: {}", jobId, tableId);
}
private void replayRunningJob(IndexChangeJob replayedJob) {
// do nothing, finish inverted index task
this.jobState = JobState.FINISHED;
this.finishedTimeMs = replayedJob.finishedTimeMs;
LOG.info("replay finished inverted index job: {} table id: {}", jobId, tableId);
}
/**
* Replay job in CANCELLED state.
*/
private void replayCancelled(IndexChangeJob replayedJob) {
cancelInternal();
this.jobState = JobState.CANCELLED;
this.errMsg = replayedJob.errMsg;
this.finishedTimeMs = replayedJob.finishedTimeMs;
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
public static IndexChangeJob read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) {
IndexChangeJob job = new IndexChangeJob();
job.readFields(in);
return job;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, IndexChangeJob.class);
}
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this, IndexChangeJob.class);
Text.writeString(out, json);
}
protected void readFields(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) {
jobId = in.readLong();
jobState = JobState.valueOf(Text.readString(in));
dbId = in.readLong();
tableId = in.readLong();
tableName = Text.readString(in);
partitionId = in.readLong();
partitionName = Text.readString(in);
errMsg = Text.readString(in);
createTimeMs = in.readLong();
finishedTimeMs = in.readLong();
watershedTxnId = in.readLong();
isDropOp = in.readBoolean();
alterInvertedIndexes = Lists.newArrayList();
int alterInvertedIndexesSize = in.readInt();
for (int i = 0; i < alterInvertedIndexesSize; ++i) {
Index alterIndex = Index.read(in);
alterInvertedIndexes.add(alterIndex);
}
originIndexId = in.readLong();
invertedIndexBatchTask = new AgentBatchTask();
}
}
public String getAlterInvertedIndexesInfo() {
String info = null;
List<String> infoList = Lists.newArrayList();
String invertedIndexChangeInfo = "";
for (Index invertedIndex : alterInvertedIndexes) {
invertedIndexChangeInfo += "[" + (isDropOp ? "DROP " : "ADD ") + invertedIndex.toString() + "], ";
}
infoList.add(invertedIndexChangeInfo);
info = Joiner.on(", ").join(infoList.subList(0, infoList.size()));
return info;
}
public void getInfo(List<List<Comparable>> infos) {
// calc progress first. all index share the same process
String progress = FeConstants.null_string;
if (jobState == JobState.RUNNING && invertedIndexBatchTask.getTaskNum() > 0) {
progress = invertedIndexBatchTask.getFinishedTaskNum() + "/" + invertedIndexBatchTask.getTaskNum();
}
List<Comparable> info = Lists.newArrayList();
info.add(jobId);
info.add(tableName);
info.add(partitionName);
info.add(getAlterInvertedIndexesInfo());
info.add(TimeUtils.longToTimeStringWithms(createTimeMs));
info.add(TimeUtils.longToTimeStringWithms(finishedTimeMs));
info.add(watershedTxnId);
info.add(jobState.name());
info.add(errMsg);
info.add(progress);
infos.add(info);
}
}