KinesisProgress.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKinesisRLTaskProgress;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* Progress tracking for Kinesis Routine Load jobs.
*
* Kinesis uses sequence numbers instead of offsets like Kafka.
* A sequence number is a unique identifier for each record within a shard.
* Sequence numbers are string representations of 128-bit integers.
*
* Special position values:
* - TRIM_HORIZON: Start from the oldest record in the shard
* - LATEST: Start from the newest record (records arriving after the iterator is created)
* - AT_TIMESTAMP: Start from a specific timestamp
* - Specific sequence number: Start from or after a specific sequence number
*/
public class KinesisProgress extends RoutineLoadProgress {
private static final Logger LOG = LogManager.getLogger(KinesisProgress.class);
// Special position constants
public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON";
public static final String POSITION_LATEST = "LATEST";
public static final String POSITION_AT_TIMESTAMP = "AT_TIMESTAMP";
// Internal representation for special positions
// Using negative values since sequence numbers are always positive
public static final String TRIM_HORIZON_VAL = "-2";
public static final String LATEST_VAL = "-1";
/**
* Map from shard ID to sequence number.
* The sequence number saved here is the next sequence number to be consumed.
*
* Note: Unlike Kafka partitions which are integers, Kinesis shard IDs are strings
* like "shardId-000000000000".
*/
@SerializedName(value = "shardToSeqNum")
private ConcurrentMap<String, String> shardIdToSequenceNumber = Maps.newConcurrentMap();
// MillisBehindLatest per shard, reported by BE from GetRecords response.
// Not persisted — refreshed every task commit. Used only for lag display and scheduling.
private ConcurrentMap<String, Long> shardIdToMillsBehindLatest = Maps.newConcurrentMap();
private ReentrantLock lock = new ReentrantLock(true);
public KinesisProgress() {
super(LoadDataSourceType.KINESIS);
}
public KinesisProgress(TKinesisRLTaskProgress tKinesisRLTaskProgress) {
super(LoadDataSourceType.KINESIS);
this.shardIdToSequenceNumber = new ConcurrentHashMap<>();
if (tKinesisRLTaskProgress.getShardCmtSeqNum() != null) {
shardIdToSequenceNumber.putAll(tKinesisRLTaskProgress.getShardCmtSeqNum());
}
if (tKinesisRLTaskProgress.isSetShardMillsBehindLatest()) {
this.shardIdToMillsBehindLatest = new ConcurrentHashMap<>(
tKinesisRLTaskProgress.getShardMillsBehindLatest());
}
}
public KinesisProgress(Map<String, String> shardIdToSequenceNumber) {
super(LoadDataSourceType.KINESIS);
this.shardIdToSequenceNumber = new ConcurrentHashMap<>();
this.shardIdToSequenceNumber.putAll(shardIdToSequenceNumber);
}
public KinesisProgress(ConcurrentMap<String, String> shardIdToSequenceNumber) {
super(LoadDataSourceType.KINESIS);
this.shardIdToSequenceNumber = shardIdToSequenceNumber;
}
/**
* Get sequence numbers for specified shard IDs.
*/
public ConcurrentMap<String, String> getShardIdToSequenceNumber(List<String> shardIds) {
ConcurrentMap<String, String> result = Maps.newConcurrentMap();
for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) {
for (String shardId : shardIds) {
if (entry.getKey().equals(shardId)) {
result.put(shardId, entry.getValue());
}
}
}
return result;
}
/**
* Add a shard with its starting position.
*/
public void addShardPosition(Pair<String, String> shardPosition) {
lock.lock();
try {
shardIdToSequenceNumber.put(shardPosition.first, shardPosition.second);
} finally {
lock.unlock();
}
}
/**
* Get the sequence number for a specific shard.
*/
public String getSequenceNumberByShard(String shardId) {
return shardIdToSequenceNumber.get(shardId);
}
/**
* Get all shard to sequence number mappings.
*/
public ConcurrentMap<String, String> getShardIdToSequenceNumber() {
return shardIdToSequenceNumber;
}
/**
* Get all shard to MillisBehindLatest mappings (from the last task commit).
*/
public ConcurrentMap<String, Long> getShardIdToMillsBehindLatest() {
return shardIdToMillsBehindLatest;
}
/**
* Check if the progress contains a specific shard.
*/
public boolean containsShard(String shardId) {
return shardIdToSequenceNumber.containsKey(shardId);
}
/**
* Check if any shards are being tracked.
*/
public boolean hasShards() {
return !shardIdToSequenceNumber.isEmpty();
}
/**
* Get human-readable progress information.
*/
private void getReadableProgress(ConcurrentMap<String, String> showShardIdToPosition) {
for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) {
String position = entry.getValue();
if (TRIM_HORIZON_VAL.equals(position)) {
showShardIdToPosition.put(entry.getKey(), POSITION_TRIM_HORIZON);
} else if (LATEST_VAL.equals(position)) {
showShardIdToPosition.put(entry.getKey(), POSITION_LATEST);
} else {
// For actual sequence numbers, show the last consumed sequence number
showShardIdToPosition.put(entry.getKey(), position);
}
}
}
/**
* Check that all specified shards exist in the progress.
*/
public void checkShards(List<Pair<String, String>> kinesisShardPositions) throws DdlException {
for (Pair<String, String> pair : kinesisShardPositions) {
if (!shardIdToSequenceNumber.containsKey(pair.first)) {
throw new DdlException("The specified shard " + pair.first + " is not in the consumed shards");
}
}
}
/**
* Modify the position for specific shards.
*/
public void modifyPosition(List<Pair<String, String>> kinesisShardPositions) {
lock.lock();
try {
for (Pair<String, String> pair : kinesisShardPositions) {
shardIdToSequenceNumber.put(pair.first, pair.second);
}
} finally {
lock.unlock();
}
}
/**
* Get shard ID and position pairs.
*/
public List<Pair<String, String>> getShardPositionPairs(boolean alreadyConsumed) {
List<Pair<String, String>> pairs = Lists.newArrayList();
for (Map.Entry<String, String> entry : shardIdToSequenceNumber.entrySet()) {
String position = entry.getValue();
if (TRIM_HORIZON_VAL.equals(position)) {
pairs.add(Pair.of(entry.getKey(), POSITION_TRIM_HORIZON));
} else if (LATEST_VAL.equals(position)) {
pairs.add(Pair.of(entry.getKey(), POSITION_LATEST));
} else {
// For actual sequence numbers
pairs.add(Pair.of(entry.getKey(), position));
}
}
return pairs;
}
/**
* Calculate lag for each shard.
* Note: Kinesis lag calculation is more complex than Kafka because:
* 1. Sequence numbers are strings, not comparable integers
* 2. GetRecords API returns millisBehindLatest which is more useful
*
* This method returns -1 for shards where lag cannot be calculated.
*/
public Map<String, Long> getLag(Map<String, Long> shardIdWithMillsBehindLatest) {
Map<String, Long> lagMap = Maps.newHashMap();
for (String shardId : shardIdToSequenceNumber.keySet()) {
Long lag = shardIdWithMillsBehindLatest.get(shardId);
lagMap.put(shardId, lag != null ? lag : -1L);
}
return lagMap;
}
@Override
public String toString() {
ConcurrentMap<String, String> showShardIdToPosition = Maps.newConcurrentMap();
getReadableProgress(showShardIdToPosition);
return "KinesisProgress [shardIdToSequenceNumber="
+ Joiner.on("|").withKeyValueSeparator("_").join(showShardIdToPosition) + "]";
}
@Override
public String toJsonString() {
ConcurrentMap<String, String> showShardIdToPosition = Maps.newConcurrentMap();
getReadableProgress(showShardIdToPosition);
Gson gson = new Gson();
return gson.toJson(showShardIdToPosition);
}
@Override
public void update(RLTaskTxnCommitAttachment attachment) {
KinesisProgress newProgress = (KinesisProgress) attachment.getProgress();
// Update sequence numbers for each shard
// The sequence number in newProgress is the last successfully consumed sequence number
// We store it directly (unlike Kafka where we add 1 to get the next offset)
lock.lock();
try {
newProgress.shardIdToSequenceNumber.forEach((shardId, newSeqNum) -> {
this.shardIdToSequenceNumber.put(shardId, newSeqNum);
});
} finally {
lock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("update kinesis progress: {}, task: {}, job: {}",
newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
}
}
/**
* Get total progress as the count of tracked shards.
* Unlike Kafka where we sum offsets, Kinesis sequence numbers are not additive.
*/
public Long totalProgress() {
// Return the number of shards being tracked
// Actual progress is better represented by millisBehindLatest
return (long) shardIdToSequenceNumber.size();
}
}