KinesisProgress.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKinesisRLTaskProgress;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* Progress tracking for Kinesis Routine Load jobs.
*
* Kinesis uses sequence numbers instead of offsets like Kafka.
* A sequence number is a unique identifier for each record within a shard.
* Sequence numbers are string representations of 128-bit integers.
*
* Special position values:
* - TRIM_HORIZON: Start from the oldest record in the shard
* - LATEST: Start from the newest record (records arriving after the iterator is created)
* - AT_TIMESTAMP: Start from a specific timestamp
* - Specific sequence number: Start from or after a specific sequence number
*/
public class KinesisProgress extends RoutineLoadProgress {
private static final Logger LOG = LogManager.getLogger(KinesisProgress.class);
// Special position constants
public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON";
public static final String POSITION_LATEST = "LATEST";
public static final String POSITION_AT_TIMESTAMP = "AT_TIMESTAMP";
// Internal representation for special positions
// Using negative values since sequence numbers are always positive
public static final String TRIM_HORIZON_VAL = "-2";
public static final String LATEST_VAL = "-1";
/**
* Map from shard ID to sequence number.
* The sequence number saved here is the next sequence number to be consumed.
*
* Note: Unlike Kafka partitions which are integers, Kinesis shard IDs are strings
* like "shardId-000000000000".
*/
@SerializedName(value = "shardToSeqNum")
private ConcurrentMap<String, String> shardIdToSequenceNumber = Maps.newConcurrentMap();
// MillisBehindLatest per shard, reported by BE from GetRecords response.
// Not persisted — refreshed every task commit. Used only for lag display and scheduling.
private ConcurrentMap<String, Long> shardIdToMillsBehindLatest = Maps.newConcurrentMap();
// Set of shard IDs that have been closed (split/merge) during consumption.
// Not persisted — only used during task commit to remove closed shards from tracking.
private java.util.Set<String> closedShardIds = new java.util.HashSet<>();
private transient ReentrantLock lock = new ReentrantLock(true);
public KinesisProgress() {
super(LoadDataSourceType.KINESIS);
}
public KinesisProgress(TKinesisRLTaskProgress tKinesisRLTaskProgress) {
super(LoadDataSourceType.KINESIS);
this.shardIdToSequenceNumber = new ConcurrentHashMap<>();
if (tKinesisRLTaskProgress.getShardCmtSeqNum() != null) {
shardIdToSequenceNumber.putAll(tKinesisRLTaskProgress.getShardCmtSeqNum());
}
if (tKinesisRLTaskProgress.isSetShardMillsBehindLatest()) {
this.shardIdToMillsBehindLatest = new ConcurrentHashMap<>(
tKinesisRLTaskProgress.getShardMillsBehindLatest());
}
if (tKinesisRLTaskProgress.isSetClosedShardIds()) {
this.closedShardIds = new java.util.HashSet<>(tKinesisRLTaskProgress.getClosedShardIds());
}
}
public KinesisProgress(Map<String, String> shardIdToSequenceNumber) {
super(LoadDataSourceType.KINESIS);
this.shardIdToSequenceNumber = new ConcurrentHashMap<>();
this.shardIdToSequenceNumber.putAll(shardIdToSequenceNumber);
}
public KinesisProgress(ConcurrentMap<String, String> shardIdToSequenceNumber) {
super(LoadDataSourceType.KINESIS);
this.shardIdToSequenceNumber = shardIdToSequenceNumber;
}
/**
* Get sequence numbers for specified shard IDs.
*/
public ConcurrentMap<String, String> getShardIdToSequenceNumber(List<String> shardIds) {
ConcurrentMap<String, String> result = Maps.newConcurrentMap();
for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) {
for (String shardId : shardIds) {
if (entry.getKey().equals(shardId)) {
result.put(shardId, entry.getValue());
}
}
}
return result;
}
/**
* Add a shard with its starting position.
*/
public void addShardPosition(Pair<String, String> shardPosition) {
ReentrantLock progressLock = getLock();
progressLock.lock();
try {
shardIdToSequenceNumber.put(shardPosition.first, shardPosition.second);
} finally {
progressLock.unlock();
}
}
/**
* Get the sequence number for a specific shard.
*/
public String getSequenceNumberByShard(String shardId) {
return shardIdToSequenceNumber.get(shardId);
}
/**
* Get all shard to sequence number mappings.
*/
public ConcurrentMap<String, String> getShardIdToSequenceNumber() {
return shardIdToSequenceNumber;
}
/**
* Get all shard to MillisBehindLatest mappings (from the last task commit).
*/
public ConcurrentMap<String, Long> getShardIdToMillsBehindLatest() {
return shardIdToMillsBehindLatest;
}
/**
* Get the set of closed shard IDs.
*/
public java.util.Set<String> getClosedShardIds() {
return closedShardIds;
}
/**
* Check if the progress contains a specific shard.
*/
public boolean containsShard(String shardId) {
return shardIdToSequenceNumber.containsKey(shardId);
}
/**
* Check if any shards are being tracked.
*/
public boolean hasShards() {
return !shardIdToSequenceNumber.isEmpty();
}
/**
* Get human-readable progress information.
*/
private void getReadableProgress(ConcurrentMap<String, String> showShardIdToPosition) {
for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) {
String position = entry.getValue();
if (TRIM_HORIZON_VAL.equals(position)) {
showShardIdToPosition.put(entry.getKey(), POSITION_TRIM_HORIZON);
} else if (LATEST_VAL.equals(position)) {
showShardIdToPosition.put(entry.getKey(), POSITION_LATEST);
} else {
// For actual sequence numbers, show the last consumed sequence number
showShardIdToPosition.put(entry.getKey(), position);
}
}
}
/**
* Check that all specified shards exist in the progress.
*/
public void checkShards(List<Pair<String, String>> kinesisShardPositions) throws DdlException {
for (Pair<String, String> pair : kinesisShardPositions) {
if (!shardIdToSequenceNumber.containsKey(pair.first)) {
throw new DdlException("The specified shard " + pair.first + " is not in the consumed shards");
}
}
}
/**
* Modify the position for specific shards.
*/
public void modifyPosition(List<Pair<String, String>> kinesisShardPositions) {
ReentrantLock progressLock = getLock();
progressLock.lock();
try {
for (Pair<String, String> pair : kinesisShardPositions) {
shardIdToSequenceNumber.put(pair.first, pair.second);
}
} finally {
progressLock.unlock();
}
}
/**
* Get shard ID and position pairs.
*/
public List<Pair<String, String>> getShardPositionPairs(boolean alreadyConsumed) {
List<Pair<String, String>> pairs = Lists.newArrayList();
for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) {
String position = entry.getValue();
if (TRIM_HORIZON_VAL.equals(position)) {
pairs.add(Pair.of(entry.getKey(), POSITION_TRIM_HORIZON));
} else if (LATEST_VAL.equals(position)) {
pairs.add(Pair.of(entry.getKey(), POSITION_LATEST));
} else {
// For actual sequence numbers
pairs.add(Pair.of(entry.getKey(), position));
}
}
return pairs;
}
/**
* Calculate lag for each shard.
* Note: Kinesis lag calculation is more complex than Kafka because:
* 1. Sequence numbers are strings, not comparable integers
* 2. GetRecords API returns millisBehindLatest which is more useful
*
* This method returns -1 for shards where lag cannot be calculated.
*/
public Map<String, Long> getLag(Map<String, Long> shardIdWithMillsBehindLatest) {
Map<String, Long> lagMap = Maps.newHashMap();
for (String shardId : shardIdToSequenceNumber.keySet()) {
Long lag = shardIdWithMillsBehindLatest.get(shardId);
lagMap.put(shardId, lag != null ? lag : -1L);
}
return lagMap;
}
@Override
public String toString() {
ConcurrentMap<String, String> showShardIdToPosition = Maps.newConcurrentMap();
getReadableProgress(showShardIdToPosition);
return "KinesisProgress [shardIdToSequenceNumber="
+ Joiner.on("|").withKeyValueSeparator("_").join(showShardIdToPosition) + "]";
}
@Override
public String toJsonString() {
ConcurrentMap<String, String> showShardIdToPosition = Maps.newConcurrentMap();
getReadableProgress(showShardIdToPosition);
Gson gson = new Gson();
return gson.toJson(showShardIdToPosition);
}
@Override
public void update(RLTaskTxnCommitAttachment attachment) {
KinesisProgress newProgress = (KinesisProgress) attachment.getProgress();
ReentrantLock progressLock = getLock();
progressLock.lock();
try {
// Update sequence numbers for each shard
newProgress.shardIdToSequenceNumber.forEach((shardId, newSeqNum) -> {
this.shardIdToSequenceNumber.put(shardId, newSeqNum);
});
// Update MillisBehindLatest
if (newProgress.getShardIdToMillsBehindLatest() != null) {
this.shardIdToMillsBehindLatest.putAll(newProgress.getShardIdToMillsBehindLatest());
}
// Remove closed shards from tracking
if (newProgress.getClosedShardIds() != null && !newProgress.getClosedShardIds().isEmpty()) {
for (String closedShardId : newProgress.getClosedShardIds()) {
this.shardIdToSequenceNumber.remove(closedShardId);
this.shardIdToMillsBehindLatest.remove(closedShardId);
LOG.info("Removed closed shard from progress: {}", closedShardId);
}
// Store closed shard IDs for Job to clean up consumingClosedShards
this.closedShardIds.addAll(newProgress.getClosedShardIds());
}
} finally {
progressLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("update kinesis progress: {}, task: {}, job: {}",
newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
}
}
/**
* Get total progress as the count of tracked shards.
* Unlike Kafka where we sum offsets, Kinesis sequence numbers are not additive.
*/
public Long totalProgress() {
// Return the number of shards being tracked
// Actual progress is better represented by millisBehindLatest
return (long) shardIdToSequenceNumber.size();
}
private ReentrantLock getLock() {
if (lock == null) {
lock = new ReentrantLock(true);
}
return lock;
}
}