UpsertRecord.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.binlog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.transaction.PartitionCommitInfo;
import org.apache.doris.transaction.TableCommitInfo;
import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class UpsertRecord {
public static class TableRecord {
public static class PartitionRecord {
@SerializedName(value = "partitionId")
public long partitionId;
@SerializedName(value = "range")
private String range;
@SerializedName(value = "version")
public long version;
@SerializedName(value = "isTempPartition")
public boolean isTemp;
@SerializedName(value = "stid")
public long subTxnId;
}
@SerializedName(value = "partitionRecords")
private List<PartitionRecord> partitionRecords;
@SerializedName(value = "indexIds")
private Set<Long> indexIds;
@SerializedName(value = "deltaRows")
private Map<Long, Long> deltaRows; // tablet id -> delta rows, null if not exist
public TableRecord(Set<Long> indexIds, Map<Long, Long> deltaRows) {
partitionRecords = Lists.newArrayList();
this.indexIds = indexIds;
this.deltaRows = deltaRows;
}
private void addPartitionRecord(PartitionCommitInfo partitionCommitInfo) {
addPartitionRecord(-1, partitionCommitInfo);
}
private void addPartitionRecord(long subTxnId, PartitionCommitInfo partitionCommitInfo) {
PartitionRecord partitionRecord = new PartitionRecord();
partitionRecord.subTxnId = subTxnId;
partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
partitionRecord.range = partitionCommitInfo.getPartitionRange();
partitionRecord.version = partitionCommitInfo.getVersion();
partitionRecord.isTemp = partitionCommitInfo.isTempPartition();
partitionRecords.add(partitionRecord);
}
public List<PartitionRecord> getPartitionRecords() {
return partitionRecords;
}
}
@SerializedName(value = "commitSeq")
private long commitSeq;
// record the transaction state
// (label, db, table, [shard_id, partition_id, index_id, version, version_hash])
@SerializedName(value = "txnId")
private long txnId;
@SerializedName(value = "timeStamp")
private long timeStamp;
@SerializedName(value = "label")
private String label;
@SerializedName(value = "dbId")
private long dbId;
// pair is (tableId, tableRecord)
@SerializedName(value = "tableRecords")
private Map<Long, TableRecord> tableRecords;
@SerializedName(value = "stids")
private List<Long> subTxnIds;
// construct from TransactionState
public UpsertRecord(long commitSeq, TransactionState state) {
this.commitSeq = commitSeq;
txnId = state.getTransactionId();
timeStamp = state.getFinishTime();
label = state.getLabel();
dbId = state.getDbId();
tableRecords = Maps.newHashMap();
Map<Long, Set<Long>> loadedTableIndexIds = state.getLoadedTblIndexes();
Map<Long, Map<Long, Long>> tabletDeltaRows = state.getTableIdToTabletDeltaRows();
if (tabletDeltaRows == null) {
tabletDeltaRows = Maps.newHashMap();
}
final Map<Long, Map<Long, Long>> finalTabletDeltaRows = tabletDeltaRows;
if (state.getSubTxnIds() != null) {
state.getSubTxnIdToTableCommitInfo().forEach((subTxnId, tableCommitInfo) -> {
Set<Long> indexIds = loadedTableIndexIds.get(tableCommitInfo.getTableId());
Map<Long, Long> deltaRows = finalTabletDeltaRows.get(tableCommitInfo.getTableId());
TableRecord tableRecord = tableRecords.compute(tableCommitInfo.getTableId(),
(k, v) -> v == null ? new TableRecord(indexIds, deltaRows) : v);
for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
tableRecord.addPartitionRecord(subTxnId, partitionCommitInfo);
}
});
subTxnIds = state.getSubTxnIds();
} else {
for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
Map<Long, Long> deltaRows = tabletDeltaRows.get(info.getTableId());
TableRecord tableRecord = new TableRecord(indexIds, deltaRows);
tableRecords.put(info.getTableId(), tableRecord);
for (PartitionCommitInfo partitionCommitInfo : info.getIdToPartitionCommitInfo().values()) {
tableRecord.addPartitionRecord(partitionCommitInfo);
}
}
}
}
public long getTimestamp() {
return timeStamp;
}
public long getDbId() {
return dbId;
}
public long getCommitSeq() {
return commitSeq;
}
public List<Long> getAllReleatedTableIds() {
return new ArrayList<>(tableRecords.keySet());
}
public Map<Long, TableRecord> getTableRecords() {
return tableRecords;
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
public static UpsertRecord fromJson(String json) {
return GsonUtils.GSON.fromJson(json, UpsertRecord.class);
}
@Override
public String toString() {
return toJson();
}
}