KinesisDataSourceProperties.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload.kinesis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.LoadDataSourceType;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
/**
* AWS Kinesis data source properties for Routine Load.
*
* Kinesis is AWS's managed streaming data service, similar to Apache Kafka.
* Key differences from Kafka:
* - Uses shards instead of partitions
* - Uses sequence numbers instead of offsets
* - Requires AWS authentication (IAM, access keys, etc.)
* - Region-specific endpoints
*
* Example usage in SQL:
* CREATE ROUTINE LOAD my_job ON my_table
* FROM KINESIS (
* "kinesis_region" = "us-east-1",
* "kinesis_stream" = "my-stream",
* "kinesis_access_key" = "AKIAIOSFODNN7EXAMPLE",
* "kinesis_secret_key" = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
* );
*/
public class KinesisDataSourceProperties extends AbstractDataSourceProperties {
private static final String CUSTOM_KINESIS_PROPERTY_PREFIX = "property.";
/**
* List of shard IDs with their starting sequence numbers.
* Pair<ShardId, SequenceNumber>
* SequenceNumber can be:
* - Actual sequence number string
* - TRIM_HORIZON_VAL (-2) for oldest record
* - LATEST_VAL (-1) for newest record
* - Timestamp value for AT_TIMESTAMP
*/
@Getter
@Setter
@SerializedName(value = "kinesisShardPositions")
private List<Pair<String, String>> kinesisShardPositions = Lists.newArrayList();
/**
* Custom Kinesis properties for advanced configuration.
* Includes AWS credentials and client configuration.
*/
@Getter
@SerializedName(value = "customKinesisProperties")
private Map<String, String> customKinesisProperties;
/**
* Whether positions are specified as timestamps.
*/
@Getter
@SerializedName(value = "isPositionsForTimes")
private boolean isPositionsForTimes = false;
/**
* AWS region for the Kinesis stream.
*/
@Getter
@SerializedName(value = "region")
private String region;
/**
* Name of the Kinesis stream.
*/
@Getter
@SerializedName(value = "stream")
private String stream;
/**
* Optional endpoint URL for custom endpoints.
*/
@Getter
@SerializedName(value = "endpoint")
private String endpoint;
// Standard position constants (similar to Kafka's OFFSET_BEGINNING/OFFSET_END)
public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON";
public static final String POSITION_LATEST = "LATEST";
public static final String POSITION_AT_TIMESTAMP = "AT_TIMESTAMP";
// Configurable data source properties that can be set by user
private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET =
new ImmutableSet.Builder<String>()
.add(KinesisConfiguration.KINESIS_REGION.getName())
.add(KinesisConfiguration.KINESIS_STREAM.getName())
.add(KinesisConfiguration.KINESIS_ENDPOINT.getName())
.add(KinesisConfiguration.KINESIS_SHARDS.getName())
.add(KinesisConfiguration.KINESIS_POSITIONS.getName())
.add(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName())
.add(KinesisConfiguration.KINESIS_ACCESS_KEY.getName())
.add(KinesisConfiguration.KINESIS_SECRET_KEY.getName())
.add(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName())
.add(KinesisConfiguration.KINESIS_ROLE_ARN.getName())
.add(KinesisConfiguration.KINESIS_EXTERNAL_ID.getName())
.add(KinesisConfiguration.KINESIS_PROFILE_NAME.getName())
.add(KinesisConfiguration.KINESIS_CONSUMER_NAME.getName())
.add(KinesisConfiguration.KINESIS_MAX_RECORDS_PER_FETCH.getName())
.add(KinesisConfiguration.KINESIS_FETCH_INTERVAL_MS.getName())
.add(KinesisConfiguration.KINESIS_CONNECTION_TIMEOUT_MS.getName())
.add(KinesisConfiguration.KINESIS_REQUEST_TIMEOUT_MS.getName())
.add(KinesisConfiguration.KINESIS_MAX_RETRIES.getName())
.add(KinesisConfiguration.KINESIS_USE_HTTPS.getName())
.build();
public KinesisDataSourceProperties(Map<String, String> dataSourceProperties, boolean multiLoad) {
super(dataSourceProperties, multiLoad);
}
public KinesisDataSourceProperties(Map<String, String> originalDataSourceProperties) {
super(originalDataSourceProperties);
}
@Override
protected String getDataSourceType() {
return LoadDataSourceType.KINESIS.name();
}
@Override
protected List<String> getRequiredProperties() {
return Arrays.asList(
KinesisConfiguration.KINESIS_REGION.getName(),
KinesisConfiguration.KINESIS_STREAM.getName()
);
}
@Override
public void convertAndCheckDataSourceProperties() throws UserException {
// Check for invalid properties
Optional<String> invalidProperty = originalDataSourceProperties.keySet().stream()
.filter(key -> !CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET.contains(key))
.filter(key -> !key.startsWith(CUSTOM_KINESIS_PROPERTY_PREFIX))
.findFirst();
if (invalidProperty.isPresent()) {
throw new AnalysisException(invalidProperty.get() + " is invalid Kinesis property or cannot be set");
}
// Parse region (required)
this.region = KinesisConfiguration.KINESIS_REGION.getParameterValue(
originalDataSourceProperties.get(KinesisConfiguration.KINESIS_REGION.getName()));
if (!isAlter() && StringUtils.isBlank(region)) {
throw new AnalysisException(KinesisConfiguration.KINESIS_REGION.getName() + " is a required property");
}
if (StringUtils.isNotBlank(region)) {
validateRegion(region);
}
// Parse stream name (required)
this.stream = KinesisConfiguration.KINESIS_STREAM.getParameterValue(
originalDataSourceProperties.get(KinesisConfiguration.KINESIS_STREAM.getName()));
if (!isAlter() && StringUtils.isBlank(stream)) {
throw new AnalysisException(KinesisConfiguration.KINESIS_STREAM.getName() + " is a required property");
}
// Parse optional endpoint
this.endpoint = KinesisConfiguration.KINESIS_ENDPOINT.getParameterValue(
originalDataSourceProperties.get(KinesisConfiguration.KINESIS_ENDPOINT.getName()));
// Parse custom properties (AWS credentials, etc.)
analyzeCustomProperties();
// Validate AWS authentication configuration
validateAwsAuthConfig();
// Parse shards
List<String> shards = KinesisConfiguration.KINESIS_SHARDS.getParameterValue(
originalDataSourceProperties.get(KinesisConfiguration.KINESIS_SHARDS.getName()));
if (CollectionUtils.isNotEmpty(shards)) {
analyzeKinesisShardProperty(shards);
}
// Parse positions
List<String> positions = KinesisConfiguration.KINESIS_POSITIONS.getParameterValue(
originalDataSourceProperties.get(KinesisConfiguration.KINESIS_POSITIONS.getName()));
// Get default position from customKinesisProperties (already parsed from "property." prefix)
String defaultPositionString = customKinesisProperties.get(
KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName());
// Validate that positions and default_position are not both set
if (CollectionUtils.isNotEmpty(positions) && StringUtils.isNotBlank(defaultPositionString)) {
throw new AnalysisException("Only one of " + KinesisConfiguration.KINESIS_POSITIONS.getName()
+ " and " + KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName() + " can be set.");
}
// For alter operation, shards and positions must be set together
if (isAlter() && CollectionUtils.isNotEmpty(shards) && CollectionUtils.isEmpty(positions)
&& StringUtils.isBlank(defaultPositionString)) {
throw new AnalysisException("Must set position or default position with shard property");
}
// Process positions
if (CollectionUtils.isNotEmpty(positions)) {
this.isPositionsForTimes = analyzeKinesisPositionProperty(positions);
return;
}
this.isPositionsForTimes = analyzeKinesisDefaultPositionProperty();
if (CollectionUtils.isNotEmpty(kinesisShardPositions)) {
setDefaultPositionForShards(this.kinesisShardPositions, defaultPositionString, this.isPositionsForTimes);
}
}
/**
* Validate AWS region format.
*/
private void validateRegion(String region) throws AnalysisException {
// AWS regions follow patterns like: us-east-1, eu-west-2, ap-southeast-1, cn-north-1
if (!region.matches("^[a-z]{2}(-[a-z]+)?-[a-z]+-\\d$")) {
throw new AnalysisException("Invalid AWS region format: " + region
+ ". Expected format like: us-east-1, eu-west-2, cn-north-1");
}
}
/**
* Parse and store custom Kinesis properties.
*/
private void analyzeCustomProperties() throws AnalysisException {
this.customKinesisProperties = new HashMap<>();
for (Map.Entry<String, String> entry : originalDataSourceProperties.entrySet()) {
if (entry.getKey().startsWith(CUSTOM_KINESIS_PROPERTY_PREFIX)) {
String propertyKey = entry.getKey();
String propertyValue = entry.getValue();
String[] propertyKeyParts = propertyKey.split("\\.", 2);
if (propertyKeyParts.length < 2 || propertyKeyParts[1].isEmpty()) {
throw new AnalysisException("Kinesis property key format is invalid: " + propertyKey);
}
this.customKinesisProperties.put(propertyKeyParts[1], propertyValue);
}
}
// Store AWS credentials in custom properties for later use
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_ACCESS_KEY.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_SECRET_KEY.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_ROLE_ARN.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_EXTERNAL_ID.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_PROFILE_NAME.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_CONSUMER_NAME.getName());
// Store connection settings
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_MAX_RECORDS_PER_FETCH.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_FETCH_INTERVAL_MS.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_CONNECTION_TIMEOUT_MS.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_REQUEST_TIMEOUT_MS.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_MAX_RETRIES.getName());
storeCredentialInCustomProperties(KinesisConfiguration.KINESIS_USE_HTTPS.getName());
// Store default position for later use
String defaultPosition = originalDataSourceProperties.get(
KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName());
if (StringUtils.isNotBlank(defaultPosition)) {
customKinesisProperties.put(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName(), defaultPosition);
}
}
private void storeCredentialInCustomProperties(String key) {
String value = originalDataSourceProperties.get(key);
if (StringUtils.isNotBlank(value)) {
customKinesisProperties.put(key, value);
}
}
/**
* Validate AWS authentication configuration.
* At least one authentication method must be provided:
* 1. Access key + Secret key
* 2. IAM Role ARN
* 3. AWS Profile name
* 4. Default credential chain (EC2 instance profile, environment variables, etc.)
*/
private void validateAwsAuthConfig() throws AnalysisException {
String accessKey = customKinesisProperties.get(KinesisConfiguration.KINESIS_ACCESS_KEY.getName());
String secretKey = customKinesisProperties.get(KinesisConfiguration.KINESIS_SECRET_KEY.getName());
String roleArn = customKinesisProperties.get(KinesisConfiguration.KINESIS_ROLE_ARN.getName());
// If access key is provided, secret key must also be provided
if (StringUtils.isNotBlank(accessKey) && StringUtils.isBlank(secretKey)) {
throw new AnalysisException("When " + KinesisConfiguration.KINESIS_ACCESS_KEY.getName()
+ " is set, " + KinesisConfiguration.KINESIS_SECRET_KEY.getName() + " must also be set");
}
if (StringUtils.isNotBlank(secretKey) && StringUtils.isBlank(accessKey)) {
throw new AnalysisException("When " + KinesisConfiguration.KINESIS_SECRET_KEY.getName()
+ " is set, " + KinesisConfiguration.KINESIS_ACCESS_KEY.getName() + " must also be set");
}
// If external ID is provided, role ARN must be provided
String externalId = customKinesisProperties.get(KinesisConfiguration.KINESIS_EXTERNAL_ID.getName());
if (StringUtils.isNotBlank(externalId) && StringUtils.isBlank(roleArn)) {
throw new AnalysisException("When " + KinesisConfiguration.KINESIS_EXTERNAL_ID.getName()
+ " is set, " + KinesisConfiguration.KINESIS_ROLE_ARN.getName() + " must also be set");
}
// Note: We don't require any authentication config because the default credential chain
// can be used in EC2/EKS environments with instance profiles or service accounts
}
/**
* Initialize shard positions with default values.
*/
private void analyzeKinesisShardProperty(List<String> shards) {
shards.forEach(shardId -> this.kinesisShardPositions.add(Pair.of(shardId, POSITION_LATEST)));
}
/**
* Parse position property and set positions for each shard.
* Returns true if positions are timestamps.
*/
private boolean analyzeKinesisPositionProperty(List<String> positions) throws UserException {
if (positions.size() != kinesisShardPositions.size()) {
throw new AnalysisException("Number of shards must equal number of positions");
}
// Check if positions are timestamps
boolean foundTime = false;
boolean foundPosition = false;
for (String position : positions) {
if (TimeUtils.timeStringToLong(position) != -1) {
foundTime = true;
} else {
foundPosition = true;
}
}
if (foundTime && foundPosition) {
throw new AnalysisException("Cannot mix timestamp and position values in "
+ KinesisConfiguration.KINESIS_POSITIONS.getName());
}
if (foundTime) {
TimeZone timeZone = TimeUtils.getOrSystemTimeZone(getTimezone());
for (int i = 0; i < positions.size(); i++) {
long timestamp = TimeUtils.timeStringToLong(positions.get(i), timeZone);
kinesisShardPositions.get(i).second = String.valueOf(timestamp);
}
} else {
for (int i = 0; i < positions.size(); i++) {
String position = positions.get(i);
validatePosition(position);
kinesisShardPositions.get(i).second = position;
}
}
return foundTime;
}
/**
* Validate position value.
*/
private void validatePosition(String position) throws AnalysisException {
if (!position.equalsIgnoreCase(POSITION_TRIM_HORIZON)
&& !position.equalsIgnoreCase(POSITION_LATEST)
&& !position.equalsIgnoreCase(POSITION_AT_TIMESTAMP)
&& !isValidSequenceNumber(position)) {
throw new AnalysisException(KinesisConfiguration.KINESIS_POSITIONS.getName()
+ " must be TRIM_HORIZON, LATEST, AT_TIMESTAMP, or a valid sequence number. Got: " + position);
}
}
/**
* Check if the string is a valid Kinesis sequence number.
* Kinesis sequence numbers are numeric strings.
*/
private boolean isValidSequenceNumber(String position) {
try {
// Kinesis sequence numbers are large numeric strings
new java.math.BigInteger(position);
return true;
} catch (NumberFormatException e) {
return false;
}
}
/**
* Analyze default position property.
* Returns true if position is a timestamp.
*/
private boolean analyzeKinesisDefaultPositionProperty() throws AnalysisException {
customKinesisProperties.putIfAbsent(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName(), POSITION_LATEST);
String defaultPosition = customKinesisProperties.get(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName());
TimeZone timeZone = TimeUtils.getOrSystemTimeZone(this.getTimezone());
long timestamp = TimeUtils.timeStringToLong(defaultPosition, timeZone);
if (timestamp != -1) {
// This is a datetime format, convert to timestamp
customKinesisProperties.put(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName(),
String.valueOf(timestamp));
return true;
} else {
if (!defaultPosition.equalsIgnoreCase(POSITION_TRIM_HORIZON)
&& !defaultPosition.equalsIgnoreCase(POSITION_LATEST)) {
throw new AnalysisException(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName()
+ " can only be set to TRIM_HORIZON, LATEST, or a datetime string. Got: " + defaultPosition);
}
return false;
}
}
/**
* Set default position for all shards.
*/
private static void setDefaultPositionForShards(List<Pair<String, String>> shardPositions,
String defaultPosition, boolean isForTimes) {
if (isForTimes) {
for (Pair<String, String> pair : shardPositions) {
pair.second = defaultPosition;
}
} else {
for (Pair<String, String> pair : shardPositions) {
pair.second = defaultPosition != null ? defaultPosition : POSITION_LATEST;
}
}
}
}