TransactionState.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TransactionState implements Writable {
private static final Logger LOG = LogManager.getLogger(TransactionState.class);
// compare the TransactionState by txn id, desc
public static class TxnStateComparator implements Comparator<TransactionState> {
@Override
public int compare(TransactionState t1, TransactionState t2) {
return Long.compare(t2.getTransactionId(), t1.getTransactionId());
}
}
public static final TxnStateComparator TXN_ID_COMPARATOR = new TxnStateComparator();
public enum LoadJobSourceType {
FRONTEND(1), // old dpp load, mini load, insert stmt(not streaming type) use this type
BACKEND_STREAMING(2), // streaming load use this type
INSERT_STREAMING(3), // insert stmt (streaming type), update stmt use this type
ROUTINE_LOAD_TASK(4), // routine load task use this type
BATCH_LOAD_JOB(5); // load job v2 for broker load
@SerializedName("f")
private final int flag;
private LoadJobSourceType(int flag) {
this.flag = flag;
}
public int value() {
return flag;
}
public static LoadJobSourceType valueOf(int flag) {
switch (flag) {
case 1:
return FRONTEND;
case 2:
return BACKEND_STREAMING;
case 3:
return INSERT_STREAMING;
case 4:
return ROUTINE_LOAD_TASK;
case 5:
return BATCH_LOAD_JOB;
default:
return null;
}
}
}
public enum TxnStatusChangeReason {
DB_DROPPED,
TIMEOUT,
OFFSET_OUT_OF_RANGE,
PAUSE,
NO_PARTITIONS,
INVALID_JSON_PATH;
public static TxnStatusChangeReason fromString(String reasonString) {
for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) {
if (reasonString.contains(txnStatusChangeReason.toString())) {
return txnStatusChangeReason;
}
}
return null;
}
@Override
public String toString() {
switch (this) {
case OFFSET_OUT_OF_RANGE:
return "Offset out of range";
case NO_PARTITIONS:
return "all partitions have no load data";
default:
return this.name();
}
}
}
public enum TxnSourceType {
FE(1),
BE(2);
public int value() {
return flag;
}
private int flag;
TxnSourceType(int flag) {
this.flag = flag;
}
public static TxnSourceType valueOf(int flag) {
switch (flag) {
case 1:
return FE;
case 2:
return BE;
default:
return null;
}
}
}
public static class TxnCoordinator {
@SerializedName(value = "sourceType")
public TxnSourceType sourceType;
// backendId for backend, 0 for frontend
@SerializedName(value = "id")
public long id = 0;
@SerializedName(value = "ip")
public String ip;
// frontend/backend start time
@SerializedName(value = "st")
public long startTime = 0;
// True if this txn if created by system(such as writing data to audit table)
@SerializedName(value = "ii")
public boolean isFromInternal = false;
public TxnCoordinator() {
}
public TxnCoordinator(TxnSourceType sourceType, long id, String ip, long startTime) {
this.sourceType = sourceType;
this.id = id;
this.ip = ip;
this.startTime = startTime;
}
@Override
public String toString() {
return sourceType.toString() + ": " + ip;
}
}
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableIdList")
@Setter
@Getter
private List<Long> tableIdList;
@SerializedName(value = "txnId")
private long transactionId;
@SerializedName(value = "label")
private String label;
// requestId is used to judge whether a begin request is a internal retry request.
// no need to persist it.
private TUniqueId requestId;
@SerializedName(value = "idToTableCommitInfos")
private Map<Long, TableCommitInfo> idToTableCommitInfos;
// coordinator is show who begin this txn (FE, or one of BE, etc...)
@SerializedName(value = "txnCoordinator")
private TxnCoordinator txnCoordinator;
@SerializedName(value = "txnStatus")
private TransactionStatus transactionStatus;
@SerializedName(value = "sourceType")
private LoadJobSourceType sourceType;
@SerializedName(value = "prepareTime")
private long prepareTime;
@SerializedName(value = "preCommitTime")
private long preCommitTime;
@SerializedName(value = "commitTime")
private long commitTime;
@SerializedName(value = "finishTime")
private long finishTime;
@SerializedName(value = "reason")
private String reason = "";
// error replica ids
@SerializedName(value = "errorReplicas")
private Set<Long> errorReplicas;
// this latch will be counted down when txn status change to VISIBLE
private CountDownLatch visibleLatch;
// this state need not be serialized. the map key is backend_id
private Map<Long, List<PublishVersionTask>> publishVersionTasks;
private boolean hasSendTask;
private TransactionStatus preStatus = null;
// When publish txn, if every tablet has at least 1 replica published succ, but not quorum replicas succ,
// and time since firstPublishVersionTime has exceeds Config.publish_wait_time_second,
// then this transaction will become visible.
private long firstPublishVersionTime = -1;
private long lastPublishVersionTime = -1;
private long publishCount = 0;
// txn may try finish many times and generate a lot of log.
// use lastPublishLogTime to reduce log.
private long lastPublishLogTime = 0;
@SerializedName(value = "callbackId")
private long callbackId = -1;
// In the beforeStateTransform() phase, we will get the callback object through the callbackId,
// and if we get it, we will save it in this variable.
// The main function of this variable is to retain a reference to this callback object.
// In order to prevent in the afterStateTransform() phase the callback object may have been removed
// from the CallbackFactory, resulting in the inability to obtain the object, causing some bugs
// such as
// 1. the write lock of callback object has been called in beforeStateTransform()
// 2. callback object has been removed from CallbackFactory
// 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released.
private TxnStateChangeCallback callback = null;
@SerializedName(value = "timeoutMs")
private long timeoutMs = Config.stream_load_default_timeout_second * 1000;
private long preCommittedTimeoutMs = Config.stream_load_default_precommit_timeout_second * 1000;
// is set to true, we will double the publish timeout
private boolean prolongPublishTimeout = false;
// optional
@SerializedName(value = "txnCommitAttachment")
private TxnCommitAttachment txnCommitAttachment;
// this map should be set when load execution begin, so that when the txn commit, it will know
// which tables and rollups it loaded.
// tbl id -> (index ids)
@SerializedName(value = "loadedTblIndexes")
private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();
/**
* the value is the num delta rows of all replicas in each tablet
*/
@SerializedName(value = "deltaRows")
private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap();
private String errorLogUrl = null;
// record some error msgs during the transaction operation.
// this msg will be shown in show proc "/transactions/dbId/";
// no need to persist.
private String errMsg = "";
public class SchemaInfo {
public List<Column> schema;
public int schemaVersion;
public SchemaInfo(OlapTable olapTable) {
Map<Long, MaterializedIndexMeta> indexIdToMeta = olapTable.getIndexIdToMeta();
for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
schema = indexMeta.getSchema();
schemaVersion = indexMeta.getSchemaVersion();
break;
}
}
}
private boolean isPartialUpdate = false;
// table id -> schema info
private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();
@Getter
@SerializedName(value = "sti")
private List<Long> subTxnIds;
@Getter
@SerializedName(value = "stot")
private Map<Long, TableCommitInfo> subTxnIdToTableCommitInfo = new TreeMap<>();
@Getter
@Setter
private Set<Long> involvedBackends = Sets.newHashSet();
public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
this.transactionId = -1;
this.label = "";
this.idToTableCommitInfos = Maps.newHashMap();
// mocked, to avoid NPE
this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis());
this.transactionStatus = TransactionStatus.PREPARE;
this.sourceType = LoadJobSourceType.FRONTEND;
this.prepareTime = -1;
this.preCommitTime = -1;
this.commitTime = -1;
this.finishTime = -1;
this.reason = "";
this.errorReplicas = Sets.newHashSet();
this.publishVersionTasks = Maps.newHashMap();
this.hasSendTask = false;
this.visibleLatch = new CountDownLatch(1);
}
public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requestId,
LoadJobSourceType sourceType, TxnCoordinator txnCoordinator, long callbackId, long timeoutMs) {
this.dbId = dbId;
this.tableIdList = (tableIdList == null ? Lists.newArrayList() : tableIdList);
this.transactionId = transactionId;
this.label = label;
this.requestId = requestId;
this.idToTableCommitInfos = Maps.newHashMap();
this.txnCoordinator = txnCoordinator;
this.transactionStatus = TransactionStatus.PREPARE;
this.sourceType = sourceType;
this.prepareTime = -1;
this.preCommitTime = -1;
this.commitTime = -1;
this.finishTime = -1;
this.reason = "";
this.errorReplicas = Sets.newHashSet();
this.publishVersionTasks = Maps.newHashMap();
this.hasSendTask = false;
this.visibleLatch = new CountDownLatch(1);
this.callbackId = callbackId;
this.timeoutMs = timeoutMs;
}
//for TxnInfoPB convert to TransactionState
public TransactionState(long dbId, List<Long> tableIdList, long transactionId, String label, TUniqueId requestId,
LoadJobSourceType sourceType, TxnCoordinator txnCoordinator, TransactionStatus transactionStatus,
String reason, long callbackId, long timeoutMs, TxnCommitAttachment txnCommitAttachment, long prepareTime,
long preCommitTime, long commitTime, long finishTime) {
this(dbId, tableIdList, transactionId, label, requestId, sourceType, txnCoordinator, callbackId, timeoutMs);
this.transactionStatus = transactionStatus;
this.prepareTime = prepareTime;
this.preCommitTime = preCommitTime;
this.commitTime = commitTime;
this.finishTime = finishTime;
this.reason = reason;
this.txnCommitAttachment = txnCommitAttachment;
}
public void addSubTxnTableCommitInfo(SubTransactionState subTransactionState, TableCommitInfo tableCommitInfo) {
subTxnIdToTableCommitInfo.put(subTransactionState.getSubTransactionId(), tableCommitInfo);
}
public void setErrorReplicas(Set<Long> newErrorReplicas) {
this.errorReplicas = newErrorReplicas;
}
public void addPublishVersionTask(Long backendId, PublishVersionTask task) {
if (this.subTxnIds == null) {
this.publishVersionTasks.put(backendId, Lists.newArrayList(task));
} else {
this.publishVersionTasks.computeIfAbsent(backendId, k -> Lists.newArrayList()).add(task);
}
}
public void setSendedTask() {
this.hasSendTask = true;
updateSendTaskTime();
}
public void updateSendTaskTime() {
this.publishCount++;
this.lastPublishVersionTime = System.currentTimeMillis();
if (this.firstPublishVersionTime <= 0) {
this.firstPublishVersionTime = lastPublishVersionTime;
}
}
public long getFirstPublishVersionTime() {
return firstPublishVersionTime;
}
public long getLastPublishVersionTime() {
return this.lastPublishVersionTime;
}
public long getPublishCount() {
return publishCount;
}
public boolean hasSendTask() {
return this.hasSendTask;
}
public TUniqueId getRequestId() {
return requestId;
}
public void setTransactionId(long transactionId) {
this.transactionId = transactionId;
}
public long getTransactionId() {
return transactionId;
}
public String getLabel() {
return this.label;
}
public TxnCoordinator getCoordinator() {
return txnCoordinator;
}
public TransactionStatus getTransactionStatus() {
return transactionStatus;
}
public long getPrepareTime() {
return prepareTime;
}
public long getPreCommitTime() {
return preCommitTime;
}
public long getCommitTime() {
return commitTime;
}
public long getFinishTime() {
return finishTime;
}
public String getReason() {
return reason;
}
public TransactionStatus getPreStatus() {
return this.preStatus;
}
public TxnCommitAttachment getTxnCommitAttachment() {
return txnCommitAttachment;
}
public long getCallbackId() {
return callbackId;
}
public long getTimeoutMs() {
return timeoutMs;
}
public void setErrorLogUrl(String errorLogUrl) {
this.errorLogUrl = errorLogUrl;
}
public String getErrorLogUrl() {
return errorLogUrl;
}
public long getLastPublishLogTime() {
return lastPublishLogTime;
}
public void setLastPublishLogTime(long lastPublishLogTime) {
this.lastPublishLogTime = lastPublishLogTime;
}
public void setTransactionStatus(TransactionStatus transactionStatus) {
// status changed
this.preStatus = this.transactionStatus;
this.transactionStatus = transactionStatus;
// after status changed
if (transactionStatus == TransactionStatus.VISIBLE) {
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
}
} else if (transactionStatus == TransactionStatus.ABORTED) {
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_FAILED.increase(1L);
}
}
}
public void beforeStateTransform(TransactionStatus transactionStatus) throws TransactionException {
// before status changed
callback = Env.getCurrentGlobalTransactionMgr().getCallbackFactory().getCallback(callbackId);
if (callback != null) {
switch (transactionStatus) {
case ABORTED:
callback.beforeAborted(this);
break;
case COMMITTED:
callback.beforeCommitted(this);
break;
default:
break;
}
} else if (callback == null && callbackId > 0) {
switch (transactionStatus) {
case COMMITTED:
// Maybe listener has been deleted. The txn need to be aborted later.
throw new TransactionException(
"Failed to commit txn when callback " + callbackId + "could not be found");
default:
break;
}
}
}
public void afterStateTransform(TransactionStatus transactionStatus, boolean txnOperated) throws UserException {
afterStateTransform(transactionStatus, txnOperated, null);
}
public void afterStateTransform(TransactionStatus transactionStatus,
boolean txnOperated, String txnStatusChangeReason) throws UserException {
// after status changed
if (callback == null) {
callback = Env.getCurrentGlobalTransactionMgr().getCallbackFactory().getCallback(callbackId);
}
if (callback != null) {
switch (transactionStatus) {
case ABORTED:
callback.afterAborted(this, txnOperated, txnStatusChangeReason);
break;
case COMMITTED:
callback.afterCommitted(this, txnOperated);
break;
case VISIBLE:
callback.afterVisible(this, txnOperated);
break;
default:
break;
}
}
}
public void replaySetTransactionStatus() {
TxnStateChangeCallback callback = Env.getCurrentGlobalTransactionMgr().getCallbackFactory()
.getCallback(callbackId);
if (callback != null) {
if (transactionStatus == TransactionStatus.ABORTED) {
callback.replayOnAborted(this);
} else if (transactionStatus == TransactionStatus.COMMITTED) {
callback.replayOnCommitted(this);
} else if (transactionStatus == TransactionStatus.VISIBLE) {
callback.replayOnVisible(this);
}
}
}
public void countdownVisibleLatch() {
this.visibleLatch.countDown();
}
public void waitTransactionVisible(long timeoutMillis) throws InterruptedException {
this.visibleLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
}
public void setPrepareTime(long prepareTime) {
this.prepareTime = prepareTime;
}
public void setPreCommitTime(long preCommitTime) {
this.preCommitTime = preCommitTime;
}
public void setCommitTime(long commitTime) {
this.commitTime = commitTime;
}
public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
}
public void setReason(String reason) {
this.reason = Strings.nullToEmpty(reason);
}
public Set<Long> getErrorReplicas() {
return this.errorReplicas;
}
public long getDbId() {
return dbId;
}
// TODO should we add a lock between addTableId, removeTableId and getTableIdList?
public void addTableId(long tableId) {
this.tableIdList.add(tableId);
}
public void removeTableId(long tableId) {
Preconditions.checkState(this.tableIdList.size() > 0, "table id list is empty");
Preconditions.checkState(this.tableIdList.get(this.tableIdList.size() - 1) == tableId,
"table id is not match, expect: %s, actual: %s", tableId,
this.tableIdList.get(this.tableIdList.size() - 1));
this.tableIdList.remove(this.tableIdList.size() - 1);
}
public List<Long> getTableIdList() {
return tableIdList;
}
public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
return idToTableCommitInfos;
}
public void putIdToTableCommitInfo(long tableId, TableCommitInfo tableCommitInfo) {
idToTableCommitInfos.put(tableId, tableCommitInfo);
}
public TableCommitInfo getTableCommitInfo(long tableId) {
return this.idToTableCommitInfos.get(tableId);
}
public void setTxnCommitAttachment(TxnCommitAttachment txnCommitAttachment) {
this.txnCommitAttachment = txnCommitAttachment;
}
// return true if txn is in final status and label is expired
public boolean isExpired(long currentMillis) {
if (!transactionStatus.isFinalStatus()) {
return false;
}
long expireTime = Config.label_keep_max_second;
if (isShortTxn()) {
expireTime = Config.streaming_label_keep_max_second;
}
return (currentMillis - finishTime) / 1000 > expireTime;
}
// Return true if this txn is related to streaming load/insert/routine load task.
// We call these tasks "Short" tasks because they will be cleaned up in a short time after they are finished.
public boolean isShortTxn() {
return sourceType == LoadJobSourceType.BACKEND_STREAMING || sourceType == LoadJobSourceType.INSERT_STREAMING
|| sourceType == LoadJobSourceType.ROUTINE_LOAD_TASK;
}
// return true if txn is running but timeout
public boolean isTimeout(long currentMillis) {
return (transactionStatus == TransactionStatus.PREPARE
&& currentMillis - prepareTime > timeoutMs)
|| (transactionStatus == TransactionStatus.PRECOMMITTED
&& currentMillis - preCommitTime > preCommittedTimeoutMs);
}
public synchronized void addTableIndexes(OlapTable table) {
Set<Long> indexIds = loadedTblIndexes.computeIfAbsent(table.getId(), k -> Sets.newHashSet());
// always equal the index ids
indexIds.clear();
indexIds.addAll(table.getIndexIdToMeta().keySet());
}
public Map<Long, Set<Long>> getLoadedTblIndexes() {
return loadedTblIndexes;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("TransactionState. ");
sb.append("transaction id: ").append(transactionId);
sb.append(", label: ").append(label);
sb.append(", db id: ").append(dbId);
sb.append(", table id list: ").append(StringUtils.join(tableIdList, ","));
sb.append(", callback id: ").append(callbackId);
sb.append(", coordinator: ").append(txnCoordinator);
sb.append(", transaction status: ").append(transactionStatus);
sb.append(", error replicas num: ").append(errorReplicas.size());
sb.append(", replica ids: ").append(Joiner.on(",").join(errorReplicas.stream().limit(5).toArray()));
sb.append(", prepare time: ").append(prepareTime);
sb.append(", commit time: ").append(commitTime);
sb.append(", finish time: ").append(finishTime);
sb.append(", reason: ").append(reason);
if (txnCommitAttachment != null) {
sb.append(", attachment: ").append(txnCommitAttachment);
}
if (idToTableCommitInfos != null && !idToTableCommitInfos.isEmpty()) {
sb.append(", table commit info: ").append(idToTableCommitInfos);
}
if (subTxnIds != null) {
sb.append(", sub txn ids: ").append(subTxnIds);
}
if (!subTxnIdToTableCommitInfo.isEmpty()) {
sb.append(", sub txn table commit info: ").append(subTxnIdToTableCommitInfo);
}
return sb.toString();
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
public LoadJobSourceType getSourceType() {
return sourceType;
}
public Map<Long, List<PublishVersionTask>> getPublishVersionTasks() {
return publishVersionTasks;
}
public boolean isPublishTimeout() {
// the max timeout is Config.publish_version_timeout_second * 2;
long timeoutMillis = Config.publish_version_timeout_second * 1000;
if (prolongPublishTimeout) {
timeoutMillis *= 2;
}
return System.currentTimeMillis() - lastPublishVersionTime > timeoutMillis;
}
public void prolongPublishTimeout() {
this.prolongPublishTimeout = true;
LOG.info("prolong the timeout of publish version task for transaction: {}", transactionId);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static TransactionState read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_132) {
TransactionState transactionState = new TransactionState();
transactionState.readFields(in);
return transactionState;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, TransactionState.class);
}
}
@Deprecated
public void readFields(DataInput in) throws IOException {
transactionId = in.readLong();
label = Text.readString(in);
dbId = in.readLong();
int size = in.readInt();
for (int i = 0; i < size; i++) {
TableCommitInfo info = TableCommitInfo.read(in);
idToTableCommitInfos.put(info.getTableId(), info);
}
txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0);
transactionStatus = TransactionStatus.valueOf(in.readInt());
sourceType = LoadJobSourceType.valueOf(in.readInt());
prepareTime = in.readLong();
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_107) {
preCommitTime = in.readLong();
}
commitTime = in.readLong();
finishTime = in.readLong();
reason = Text.readString(in);
int errorReplicaNum = in.readInt();
for (int i = 0; i < errorReplicaNum; ++i) {
errorReplicas.add(in.readLong());
}
if (in.readBoolean()) {
txnCommitAttachment = TxnCommitAttachment.read(in);
}
callbackId = in.readLong();
timeoutMs = in.readLong();
tableIdList = Lists.newArrayList();
int tableListSize = in.readInt();
for (int i = 0; i < tableListSize; i++) {
tableIdList.add(in.readLong());
}
}
public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
return tableIdToTabletDeltaRows;
}
public void setTableIdToTabletDeltaRows(Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows) {
this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows);
}
public void setErrorMsg(String errMsg) {
this.errMsg = errMsg;
}
public void clearErrorMsg() {
this.errMsg = "";
}
public String getErrMsg() {
return this.errMsg;
}
// reduce memory
public void pruneAfterVisible() {
publishVersionTasks.clear();
tableIdToTabletDeltaRows.clear();
involvedBackends.clear();
}
public void setSchemaForPartialUpdate(OlapTable olapTable) {
// the caller should hold the read lock of the table
isPartialUpdate = true;
txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
}
public boolean isPartialUpdate() {
return isPartialUpdate;
}
public SchemaInfo getTxnSchema(long id) {
return txnSchemas.get(id);
}
public boolean checkSchemaCompatibility(OlapTable olapTable) {
SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
if (txnSchemaInfo == null) {
return true;
}
if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
return true;
}
for (Column txnCol : txnSchemaInfo.schema) {
if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
continue;
}
int uniqueId = txnCol.getUniqueId();
Optional<Column> currentCol = currentSchemaInfo.schema.stream()
.filter(col -> col.getUniqueId() == uniqueId).findFirst();
// for now Doris's light schema change only supports adding columns,
// dropping columns, and type conversions that increase the varchar length
if (currentCol.isPresent() && currentCol.get().getType().isStringType()) {
if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
LOG.warn("Check schema compatibility failed, txnId={}, table={}",
transactionId, olapTable.getName());
return false;
}
}
}
return true;
}
public void setSubTxnIds(List<Long> subTxnIds) {
this.subTxnIds = subTxnIds;
}
public TableCommitInfo getTableCommitInfoBySubTxnId(long subTxnId) {
return subTxnIdToTableCommitInfo.get(subTxnId);
}
public List<TableCommitInfo> getSubTxnTableCommitInfos() {
List<TableCommitInfo> tableCommitInfos = new ArrayList<>();
for (Long subTxnId : subTxnIds) {
TableCommitInfo tableCommitInfo = subTxnIdToTableCommitInfo.get(subTxnId);
if (tableCommitInfo != null) {
tableCommitInfos.add(tableCommitInfo);
}
}
return tableCommitInfos;
}
}