KinesisRoutineLoadJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.ExprToSqlVisitor;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.datasource.kinesis.KinesisUtil;
import org.apache.doris.load.routineload.kinesis.KinesisConfiguration;
import org.apache.doris.load.routineload.kinesis.KinesisDataSourceProperties;
import org.apache.doris.nereids.load.NereidsImportColumnDesc;
import org.apache.doris.nereids.load.NereidsLoadTaskInfo;
import org.apache.doris.nereids.load.NereidsLoadUtils;
import org.apache.doris.nereids.load.NereidsRoutineLoadTaskInfo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommand;
import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* KinesisRoutineLoadJob is a RoutineLoadJob that fetches data from AWS Kinesis streams.
*
* Key concepts:
* - Stream: Named collection of data records (similar to Kafka topic)
* - Shard: Sequence of data records in a stream (similar to Kafka partition)
* - Sequence Number: Unique identifier for each record within a shard (similar to Kafka offset)
* - Consumer: Application that reads from a stream
*
* The progress tracks sequence numbers for each shard, represented as:
* {"shardId-000000000000": "49590338271490256608559692538361571095921575989136588802", ...}
*/
public class KinesisRoutineLoadJob extends RoutineLoadJob {
private static final Logger LOG = LogManager.getLogger(KinesisRoutineLoadJob.class);
public static final String KINESIS_FILE_CATALOG = "kinesis";
@SerializedName("rg")
private String region;
@SerializedName("stm")
private String stream;
@SerializedName("ep")
private String endpoint;
// optional, user want to load shards(Kafka's cskp).
@SerializedName("csks")
private List<String> customKinesisShards = Lists.newArrayList();
// current shards being consumed.
// updated periodically because shards may split or merge.
private List<String> currentKinesisShards = Lists.newArrayList();
// Default starting position for new shards.
// Values: TRIM_HORIZON, LATEST, or a timestamp string.
private String kinesisDefaultPosition = "";
// custom Kinesis properties including AWS credentials and client settings.
@SerializedName("prop")
private Map<String, String> customProperties = Maps.newHashMap();
private Map<String, String> convertedCustomProperties = Maps.newHashMap();
// The latest offset of each partition fetched from kinesis server.
// Will be updated periodically by calling hasMoreDataToConsume()
private Map<String, Long> cachedShardWithMillsBehindLatest = Maps.newConcurrentMap();
// newly discovered shards from Kinesis.
private List<String> newCurrentKinesisShards = Lists.newArrayList();
public KinesisRoutineLoadJob() {
// For serialization
super(-1, LoadDataSourceType.KINESIS);
}
public KinesisRoutineLoadJob(Long id, String name, long dbId, long tableId,
String region, String stream, UserIdentity userIdentity) {
super(id, name, dbId, tableId, LoadDataSourceType.KINESIS, userIdentity);
this.region = region;
this.stream = stream;
this.progress = new KinesisProgress();
}
public KinesisRoutineLoadJob(Long id, String name, long dbId,
String region, String stream,
UserIdentity userIdentity, boolean isMultiTable) {
super(id, name, dbId, LoadDataSourceType.KINESIS, userIdentity);
this.region = region;
this.stream = stream;
this.progress = new KinesisProgress();
setMultiTable(isMultiTable);
}
public String getRegion() {
return region;
}
public String getStream() {
return stream;
}
public String getEndpoint() {
return endpoint;
}
public Map<String, String> getConvertedCustomProperties() {
return convertedCustomProperties;
}
@Override
public void prepare() throws UserException {
// should reset converted properties each time the job being prepared.
// because the file info can be changed anytime.
convertCustomProperties(true);
}
private void convertCustomProperties(boolean rebuild) throws DdlException {
if (customProperties.isEmpty()) {
return;
}
if (!rebuild && !convertedCustomProperties.isEmpty()) {
return;
}
if (rebuild) {
convertedCustomProperties.clear();
}
for (Map.Entry<String, String> entry : customProperties.entrySet()) {
convertedCustomProperties.put(entry.getKey(), entry.getValue());
}
// Handle default position
if (convertedCustomProperties.containsKey(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName())) {
kinesisDefaultPosition = convertedCustomProperties.get(
KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName());
// Keep it in convertedCustomProperties so BE can use it
}
}
private String convertedDefaultPosition() {
if (this.kinesisDefaultPosition.isEmpty()) {
return KinesisProgress.POSITION_LATEST;
}
return this.kinesisDefaultPosition;
}
@Override
public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException {
List<RoutineLoadTaskInfo> result = new ArrayList<>();
writeLock();
try {
if (state == JobState.NEED_SCHEDULE) {
// Divide shards into tasks
for (int i = 0; i < currentConcurrentTaskNum; i++) {
Map<String, String> taskKinesisProgress = Maps.newHashMap();
for (int j = i; j < currentKinesisShards.size(); j = j + currentConcurrentTaskNum) {
String shardId = currentKinesisShards.get(j);
taskKinesisProgress.put(shardId,
((KinesisProgress) progress).getSequenceNumberByShard(shardId));
}
KinesisTaskInfo kinesisTaskInfo = new KinesisTaskInfo(UUID.randomUUID(), id,
getTimeout() * 1000, taskKinesisProgress, isMultiTable(), -1, false);
routineLoadTaskInfoList.add(kinesisTaskInfo);
result.add(kinesisTaskInfo);
}
// Change job state to running
if (!result.isEmpty()) {
unprotectUpdateState(JobState.RUNNING, null, false);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignore to divide routine load job while job state {}", state);
}
}
// Save task into queue of needScheduleTasks
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTasksInQueue(result);
} finally {
writeUnlock();
}
}
@Override
public int calculateCurrentConcurrentTaskNum() {
int shardNum = currentKinesisShards.size();
if (desireTaskConcurrentNum == 0) {
desireTaskConcurrentNum = Config.max_routine_load_task_concurrent_num;
}
if (LOG.isDebugEnabled()) {
LOG.debug("current concurrent task number is min"
+ "(shard num: {}, desire task concurrent num: {}, config: {})",
shardNum, desireTaskConcurrentNum, Config.max_routine_load_task_concurrent_num);
}
currentTaskConcurrentNum = Math.min(shardNum, Math.min(desireTaskConcurrentNum,
Config.max_routine_load_task_concurrent_num));
return currentTaskConcurrentNum;
}
@Override
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionState txnState,
TransactionState.TxnStatusChangeReason txnStatusChangeReason) {
if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED
|| txnState.getTransactionStatus() == TransactionStatus.VISIBLE) {
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("no need to update the progress of kinesis routine load. txn status: {}, "
+ "txnStatusChangeReason: {}, task: {}, job: {}",
txnState.getTransactionStatus(), txnStatusChangeReason,
DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()), id);
}
return false;
}
private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) {
KinesisProgress taskProgress = (KinesisProgress) attachment.getProgress();
// Update cachedShardWithMillsBehindLatest from the MillisBehindLatest values
// reported by BE's GetRecords response. Keep the maximum value across concurrent tasks
// to avoid a stale (lower) value from one task overwriting a fresher value from another.
taskProgress.getShardIdToMillsBehindLatest().forEach((shardId, millis) ->
cachedShardWithMillsBehindLatest.merge(shardId, millis, Math::max));
this.progress.update(attachment);
}
@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
updateProgressAndOffsetsCache(attachment);
super.updateProgress(attachment);
}
@Override
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
super.replayUpdateProgress(attachment);
updateProgressAndOffsetsCache(attachment);
}
@Override
protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo, boolean delaySchedule) {
KinesisTaskInfo oldKinesisTaskInfo = (KinesisTaskInfo) routineLoadTaskInfo;
// Add new task
KinesisTaskInfo kinesisTaskInfo = new KinesisTaskInfo(oldKinesisTaskInfo,
((KinesisProgress) progress).getShardIdToSequenceNumber(oldKinesisTaskInfo.getShards()),
isMultiTable());
kinesisTaskInfo.setDelaySchedule(delaySchedule);
// Remove old task
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
// Add new task
routineLoadTaskInfoList.add(kinesisTaskInfo);
return kinesisTaskInfo;
}
@Override
protected void unprotectUpdateProgress() throws UserException {
updateNewShardProgress();
}
@Override
protected boolean refreshKafkaPartitions(boolean needAutoResume) throws UserException {
// For Kinesis, we refresh shards instead of Kafka partitions
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE || needAutoResume) {
if (customKinesisShards != null && !customKinesisShards.isEmpty()) {
return true;
}
return updateKinesisShards();
}
return true;
}
private boolean updateKinesisShards() throws UserException {
try {
this.newCurrentKinesisShards = getAllKinesisShards();
} catch (Exception e) {
String msg = e.getMessage()
+ " may be Kinesis properties set in job is error"
+ " or no shard in this stream that should check Kinesis";
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("error_msg", msg)
.build(), e);
if (this.state == JobState.NEED_SCHEDULE) {
unprotectUpdateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
false /* not replay */);
}
return false;
}
return true;
}
@Override
protected boolean unprotectNeedReschedule() throws UserException {
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) {
return isKinesisShardsChanged();
}
return false;
}
private boolean isKinesisShardsChanged() throws UserException {
if (CollectionUtils.isNotEmpty(customKinesisShards)) {
currentKinesisShards = customKinesisShards;
return false;
}
Preconditions.checkNotNull(this.newCurrentKinesisShards);
if (new HashSet<>(currentKinesisShards).containsAll(this.newCurrentKinesisShards)) {
if (currentKinesisShards.size() > this.newCurrentKinesisShards.size()) {
currentKinesisShards = this.newCurrentKinesisShards;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kinesis_shards", Joiner.on(",").join(currentKinesisShards))
.add("msg", "current kinesis shards has been changed")
.build());
}
return true;
} else {
// Check if progress is consistent
for (String shardId : currentKinesisShards) {
if (!((KinesisProgress) progress).containsShard(shardId)) {
return true;
}
}
return false;
}
} else {
currentKinesisShards = this.newCurrentKinesisShards;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kinesis_shards", Joiner.on(",").join(currentKinesisShards))
.add("msg", "current kinesis shards has been changed")
.build());
}
return true;
}
}
@Override
protected boolean needAutoResume() {
writeLock();
try {
if (this.state == JobState.PAUSED) {
return ScheduleRule.isNeedAutoSchedule(this);
}
return false;
} finally {
writeUnlock();
}
}
@Override
public String getStatistic() {
Map<String, Object> summary = this.jobStatistic.summary();
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}
/**
* Get all shard IDs from the Kinesis stream.
* Delegates to a BE node via gRPC, which calls AWS ListShards API using the SDK.
*/
private List<String> getAllKinesisShards() throws UserException {
convertCustomProperties(false);
if (!customKinesisShards.isEmpty()) {
return customKinesisShards;
}
return KinesisUtil.getAllKinesisShards(region, stream, endpoint, convertedCustomProperties);
}
/**
* Create a KinesisRoutineLoadJob from CreateRoutineLoadInfo.
*/
public static KinesisRoutineLoadJob fromCreateInfo(CreateRoutineLoadInfo info, ConnectContext ctx)
throws UserException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDBName());
long id = Env.getCurrentEnv().getNextId();
KinesisDataSourceProperties kinesisProperties =
(KinesisDataSourceProperties) info.getDataSourceProperties();
KinesisRoutineLoadJob kinesisRoutineLoadJob;
if (kinesisProperties.isMultiTable()) {
kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id, info.getName(),
db.getId(),
kinesisProperties.getRegion(), kinesisProperties.getStream(),
ctx.getCurrentUserIdentity(), true);
} else {
OlapTable olapTable = db.getOlapTableOrDdlException(info.getTableName());
checkMeta(olapTable, info.getRoutineLoadDesc());
// Check load_to_single_tablet compatibility
if (info.isLoadToSingleTablet()
&& !(olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) {
throw new DdlException(
"if load_to_single_tablet set to true, the olap table must be with random distribution");
}
long tableId = olapTable.getId();
kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id, info.getName(),
db.getId(), tableId,
kinesisProperties.getRegion(), kinesisProperties.getStream(),
ctx.getCurrentUserIdentity());
}
kinesisRoutineLoadJob.setOptional(info);
kinesisRoutineLoadJob.checkCustomProperties();
return kinesisRoutineLoadJob;
}
private void checkCustomProperties() throws DdlException {
// Validate custom properties if needed
}
private void updateNewShardProgress() throws UserException {
try {
for (String shardId : currentKinesisShards) {
if (!((KinesisProgress) progress).containsShard(shardId)) {
List<String> newShards = Lists.newArrayList();
newShards.add(shardId);
List<Pair<String, String>> newShardPositions = getNewShardPositionsFromDefault(newShards);
Preconditions.checkState(newShardPositions.size() == 1);
for (Pair<String, String> shardPosition : newShardPositions) {
((KinesisProgress) progress).addShardPosition(shardPosition);
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("kinesis_shard_id", shardPosition.first)
.add("begin_position", shardPosition.second)
.add("msg", "The new shard has been added in job"));
}
}
}
}
} catch (UserException e) {
unprotectUpdateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.PARTITIONS_ERR, e.getMessage()), false);
throw e;
}
}
private List<Pair<String, String>> getNewShardPositionsFromDefault(List<String> newShards)
throws UserException {
List<Pair<String, String>> shardPositions = Lists.newArrayList();
String defaultPosition = convertedDefaultPosition();
for (String shardId : newShards) {
shardPositions.add(Pair.of(shardId, defaultPosition));
}
return shardPositions;
}
protected void setOptional(CreateRoutineLoadInfo info) throws UserException {
super.setOptional(info);
KinesisDataSourceProperties kinesisDataSourceProperties =
(KinesisDataSourceProperties) info.getDataSourceProperties();
// Set endpoint if provided
if (kinesisDataSourceProperties.getEndpoint() != null) {
this.endpoint = kinesisDataSourceProperties.getEndpoint();
}
// Set custom shards and positions
if (CollectionUtils.isNotEmpty(kinesisDataSourceProperties.getKinesisShardPositions())) {
setCustomKinesisShards(kinesisDataSourceProperties);
}
// Set custom properties
if (MapUtils.isNotEmpty(kinesisDataSourceProperties.getCustomKinesisProperties())) {
setCustomKinesisProperties(kinesisDataSourceProperties.getCustomKinesisProperties());
}
}
private void setCustomKinesisShards(KinesisDataSourceProperties kinesisDataSourceProperties) throws LoadException {
List<Pair<String, String>> shardPositions = kinesisDataSourceProperties.getKinesisShardPositions();
for (Pair<String, String> shardPosition : shardPositions) {
this.customKinesisShards.add(shardPosition.first);
((KinesisProgress) progress).addShardPosition(shardPosition);
}
}
private void setCustomKinesisProperties(Map<String, String> kinesisProperties) {
this.customProperties = kinesisProperties;
}
@Override
public String dataSourcePropertiesJsonToString() {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("region", region);
dataSourceProperties.put("stream", stream);
if (endpoint != null) {
dataSourceProperties.put("endpoint", endpoint);
}
List<String> sortedShards = Lists.newArrayList(currentKinesisShards);
Collections.sort(sortedShards);
dataSourceProperties.put("currentKinesisShards", Joiner.on(",").join(sortedShards));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(dataSourceProperties);
}
@Override
public String customPropertiesJsonToString() {
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
// Mask sensitive information
Map<String, String> maskedProperties = new HashMap<>(customProperties);
if (maskedProperties.containsKey(KinesisConfiguration.KINESIS_SECRET_KEY.getName())) {
maskedProperties.put(KinesisConfiguration.KINESIS_SECRET_KEY.getName(), "******");
}
if (maskedProperties.containsKey(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName())) {
maskedProperties.put(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName(), "******");
}
return gson.toJson(maskedProperties);
}
@Override
public Map<String, String> getDataSourceProperties() {
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("kinesis_region", region);
dataSourceProperties.put("kinesis_stream", stream);
if (endpoint != null) {
dataSourceProperties.put("kinesis_endpoint", endpoint);
}
return dataSourceProperties;
}
@Override
public Map<String, String> getCustomProperties() {
Map<String, String> ret = new HashMap<>();
customProperties.forEach((k, v) -> {
// Mask sensitive values
if (k.equals(KinesisConfiguration.KINESIS_SECRET_KEY.getName())
|| k.equals(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName())) {
ret.put("property." + k, "******");
} else {
ret.put("property." + k, v);
}
});
return ret;
}
@Override
public void modifyProperties(AlterRoutineLoadCommand command) throws UserException {
Map<String, String> jobProperties = command.getAnalyzedJobProperties();
KinesisDataSourceProperties dataSourceProperties =
(KinesisDataSourceProperties) command.getDataSourceProperties();
writeLock();
try {
if (getState() != JobState.PAUSED) {
throw new DdlException("Only supports modification of PAUSED jobs");
}
modifyPropertiesInternal(jobProperties, dataSourceProperties);
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
}
}
private void modifyPropertiesInternal(Map<String, String> jobProperties,
KinesisDataSourceProperties dataSourceProperties)
throws UserException {
if (dataSourceProperties != null) {
List<Pair<String, String>> shardPositions = Lists.newArrayList();
Map<String, String> customKinesisProperties = Maps.newHashMap();
if (MapUtils.isNotEmpty(dataSourceProperties.getOriginalDataSourceProperties())) {
shardPositions = dataSourceProperties.getKinesisShardPositions();
customKinesisProperties = dataSourceProperties.getCustomKinesisProperties();
}
// Update custom properties
if (!customKinesisProperties.isEmpty()) {
this.customProperties.putAll(customKinesisProperties);
convertCustomProperties(true);
}
// Check and modify shard positions
if (!shardPositions.isEmpty()) {
((KinesisProgress) progress).checkShards(shardPositions);
((KinesisProgress) progress).modifyPosition(shardPositions);
}
// Modify stream if provided
if (!Strings.isNullOrEmpty(dataSourceProperties.getStream())) {
this.stream = dataSourceProperties.getStream();
this.progress = new KinesisProgress();
}
// Modify region if provided
if (!Strings.isNullOrEmpty(dataSourceProperties.getRegion())) {
this.region = dataSourceProperties.getRegion();
}
// Modify endpoint if provided
if (!Strings.isNullOrEmpty(dataSourceProperties.getEndpoint())) {
this.endpoint = dataSourceProperties.getEndpoint();
}
}
if (!jobProperties.isEmpty()) {
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
}
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
if ("ERROR".equalsIgnoreCase(policy)) {
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
} else {
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
}
}
}
LOG.info("modify the properties of kinesis routine load job: {}, jobProperties: {}, datasource properties: {}",
this.id, jobProperties, dataSourceProperties);
}
@Override
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(),
(KinesisDataSourceProperties) log.getDataSourceProperties());
} catch (UserException e) {
LOG.error("failed to replay modify kinesis routine load job: {}", id, e);
}
}
@Override
public String getLag() {
Map<String, Long> shardIdToLag = ((KinesisProgress) progress).getLag(cachedShardWithMillsBehindLatest);
Gson gson = new Gson();
return gson.toJson(shardIdToLag);
}
@Override
public TFileCompressType getCompressType() {
return TFileCompressType.PLAIN;
}
@Override
public double getMaxFilterRatio() {
return maxFilterRatio;
}
@Override
public Long totalProgress() {
return ((KinesisProgress) progress).totalProgress();
}
@Override
public Long totalLag() {
Map<String, Long> shardIdToLag = ((KinesisProgress) progress).getLag(cachedShardWithMillsBehindLatest);
return shardIdToLag.values().stream()
.filter(lag -> lag >= 0)
.mapToLong(v -> v)
.sum();
}
/**
* Check if there is more data to consume from Kinesis shards.
*
* For Kinesis, we always return true as a simple implementation,
* since Kinesis doesn't provide an easy way to check if there's more data
* without actually trying to consume it. The actual data availability
* is handled during consumption with GetRecords API which returns
* MillisBehindLatest to indicate how far behind the consumer is.
*
* @param taskId The task ID
* @param shardIdToSequenceNumber Map of shard IDs to sequence numbers
* @return true if there may be more data to consume
* @throws UserException if an error occurs
*/
public boolean hasMoreDataToConsume(UUID taskId, Map<String, String> shardIdToSequenceNumber)
throws UserException {
// If the cache is empty (no task has committed yet), consume optimistically.
if (cachedShardWithMillsBehindLatest.isEmpty()) {
return true;
}
// If any shard's MillisBehindLatest is unknown or > 0, there is data to consume.
for (String shardId : shardIdToSequenceNumber.keySet()) {
Long millis = cachedShardWithMillsBehindLatest.get(shardId);
if (millis == null || millis > 0) {
return true;
}
}
// All shards report MillisBehindLatest == 0: consumer has caught up.
if (LOG.isDebugEnabled()) {
LOG.debug("All shards caught up (MillisBehindLatest=0), skip scheduling. job {}, task {}",
id, taskId);
}
return false;
}
@Override
public NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserException {
Expression deleteCondition = getDeleteCondition() != null
? NereidsLoadUtils.parseExpressionSeq(
getDeleteCondition().accept(ExprToSqlVisitor.INSTANCE,
ToSqlParams.WITHOUT_TABLE)).get(0)
: null;
Expression precedingFilter = getPrecedingFilter() != null
? NereidsLoadUtils.parseExpressionSeq(
getPrecedingFilter().accept(ExprToSqlVisitor.INSTANCE,
ToSqlParams.WITHOUT_TABLE)).get(0)
: null;
Expression whereExpr = getWhereExpr() != null
? NereidsLoadUtils.parseExpressionSeq(getWhereExpr().accept(
ExprToSqlVisitor.INSTANCE, ToSqlParams.WITHOUT_TABLE)).get(0)
: null;
NereidsLoadTaskInfo.NereidsImportColumnDescs importColumnDescs = null;
if (columnDescs != null) {
importColumnDescs = new NereidsLoadTaskInfo.NereidsImportColumnDescs();
for (ImportColumnDesc desc : columnDescs.descs) {
Expression expression = desc.getExpr() != null
? NereidsLoadUtils.parseExpressionSeq(desc.getExpr().accept(
ExprToSqlVisitor.INSTANCE, ToSqlParams.WITHOUT_TABLE)).get(0)
: null;
importColumnDescs.descs.add(new NereidsImportColumnDesc(desc.getColumnName(), expression));
}
}
return new NereidsRoutineLoadTaskInfo(execMemLimit, new HashMap<>(jobProperties), maxBatchIntervalS,
partitionNamesInfo, mergeType, deleteCondition, sequenceCol, maxFilterRatio, importColumnDescs,
precedingFilter, whereExpr, columnSeparator, lineDelimiter, enclose, escape, sendBatchParallelism,
loadToSingleTablet, uniqueKeyUpdateMode, partialUpdateNewKeyPolicy, memtableOnSinkNode);
}
@Override
public void updateCloudProgress() throws UserException {
// Cloud mode not supported for Kinesis yet
}
@Override
protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
// Cloud mode not supported for Kinesis yet
}
}