OlapTableStreamUpdate.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.stream;
import org.apache.doris.common.UserException;
import org.apache.doris.transaction.TransactionCommitFailedException;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import java.util.HashMap;
import java.util.Map;
public class OlapTableStreamUpdate extends AbstractTableStreamUpdate {
// previous partition consumed version
@SerializedName(value = "prev")
private final Map<Long, Long> prev;
// new partition consumed version
@SerializedName(value = "next")
private final Map<Long, Long> next;
public OlapTableStreamUpdate() {
this.prev = new HashMap<>();
this.next = new HashMap<>();
}
public OlapTableStreamUpdate(Map<Long, Long> prev, Map<Long, Long> next) {
this.prev = prev;
this.next = next;
}
public Map<Long, Long> getNext() {
return next;
}
public Map<Long, Long> getPrev() {
return prev;
}
@Override
public void merge(AbstractTableStreamUpdate other) {
Preconditions.checkArgument(other instanceof OlapTableStreamUpdate);
this.next.putAll(((OlapTableStreamUpdate) other).getNext());
this.prev.putAll(((OlapTableStreamUpdate) other).getPrev());
}
public void checkPartitionOffset(String dbName, String streamName, Map<Long, Long> historicalPartitionOffset,
Map<Long, Long> partitionOffset)
throws UserException {
for (Map.Entry<Long, Long> entry : next.entrySet()) {
if (historicalPartitionOffset.containsKey(entry.getKey())) {
if (!historicalPartitionOffset.get(entry.getKey()).equals(entry.getValue())) {
throw new TransactionCommitFailedException("history offset already consumed: "
+ dbName + '-' + streamName + '-' + entry.getKey() + '-' + entry.getValue()
+ " vs " + historicalPartitionOffset.get(entry.getKey()));
}
} else if (partitionOffset.containsKey(entry.getKey())) {
if (!prev.containsKey(entry.getKey())) {
throw new TransactionCommitFailedException(
"previous version missing for partition=" + entry.getKey() + ", db=" + dbName
+ ", stream=" + streamName);
}
if (!partitionOffset.get(entry.getKey()).equals(prev.get(entry.getKey()))) {
throw new TransactionCommitFailedException("target offset already consumed: "
+ dbName + '-' + streamName + '-' + entry.getKey() + '-' + entry.getValue()
+ " vs " + partitionOffset.get(entry.getKey()));
}
} else {
// the new partition id may not be updated in time, so key in table stream not exist is also valid
}
}
}
}