KinesisConfiguration.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload.kinesis;
import com.google.common.base.Splitter;
import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Configuration enum for AWS Kinesis data source properties.
*
* AWS Kinesis is a managed real-time data streaming service from AWS.
* The main concepts in Kinesis are:
* - Stream: Similar to Kafka topic, a named stream for data records
* - Shard: Similar to Kafka partition, the base throughput unit of a stream
* - Sequence Number: Similar to Kafka offset, a unique identifier for each record within a shard
* - Consumer: Application that reads from a stream
*/
public enum KinesisConfiguration {
/**
* AWS region where the Kinesis stream is located.
* Required property.
* Example: us-east-1, ap-southeast-1, cn-north-1
*/
KINESIS_REGION("kinesis_region", null, value -> value.trim()),
/**
* Name of the Kinesis stream to consume from.
* Required property.
*/
KINESIS_STREAM("kinesis_stream", null, value -> value.trim()),
/**
* Endpoint URL for Kinesis service (optional).
* Used for custom endpoints like LocalStack for testing, or VPC endpoints.
* If not specified, the default AWS endpoint for the region will be used.
*/
KINESIS_ENDPOINT("kinesis_endpoint", null, value -> value.trim()),
/**
* Comma-separated list of shard IDs to consume from.
* If not specified, all shards will be consumed.
* Example: shardId-000000000000,shardId-000000000001
*/
KINESIS_SHARDS("kinesis_shards", null, shardsString ->
Arrays.stream(shardsString.replace(" ", "").split(","))
.collect(Collectors.toList())),
/**
* Shard iterator positions (sequence numbers) for each shard.
* Format: position1,position2,... corresponding to shards order.
* Special values:
* - TRIM_HORIZON: Start from the oldest record
* - LATEST: Start from the newest record
* - AT_TIMESTAMP: Start from a specific timestamp
* - Specific sequence number
*/
KINESIS_POSITIONS("kinesis_positions", null,
positionsString -> Splitter.on(",").trimResults().splitToList(positionsString)),
/**
* Default starting position for new shards.
* Valid values: TRIM_HORIZON, LATEST, AT_TIMESTAMP
* Default: LATEST
*/
KINESIS_DEFAULT_POSITION("kinesis_default_position", "LATEST", position -> position.trim()),
/**
* AWS Access Key ID for authentication.
* Can be omitted if using IAM role, EC2 instance profile, or environment variables.
*/
KINESIS_ACCESS_KEY("kinesis_access_key", null, value -> value),
/**
* AWS Secret Access Key for authentication.
* Can be omitted if using IAM role, EC2 instance profile, or environment variables.
*/
KINESIS_SECRET_KEY("kinesis_secret_key", null, value -> value),
/**
* AWS Session Token for temporary credentials.
* Used with STS assume role.
*/
KINESIS_SESSION_TOKEN("kinesis_session_token", null, value -> value),
/**
* IAM Role ARN to assume for accessing Kinesis.
* Useful for cross-account access.
*/
KINESIS_ROLE_ARN("kinesis_role_arn", null, value -> value.trim()),
/**
* External ID for IAM role assumption.
* Additional security measure for cross-account access.
*/
KINESIS_EXTERNAL_ID("kinesis_external_id", null, value -> value.trim()),
/**
* AWS Profile name to use from credentials file.
* If not specified, uses default profile or environment credentials.
*/
KINESIS_PROFILE_NAME("kinesis_profile_name", null, value -> value.trim()),
/**
* Consumer name for enhanced fan-out (EFO).
* When specified, uses dedicated throughput via SubscribeToShard API.
* Provides ~2MB/sec per shard with lower latency (~70ms).
*/
KINESIS_CONSUMER_NAME("kinesis_consumer_name", null, value -> value.trim()),
/**
* Maximum records per GetRecords call.
* Default: 10000 (Kinesis limit)
* Higher values improve throughput but increase memory usage.
*/
KINESIS_MAX_RECORDS_PER_FETCH("kinesis_max_records_per_fetch", 10000, Integer::parseInt),
/**
* Interval between GetRecords calls in milliseconds.
* Default: 200ms (to stay within Kinesis 5 calls/sec limit per shard)
*/
KINESIS_FETCH_INTERVAL_MS("kinesis_fetch_interval_ms", 200, Integer::parseInt),
/**
* Connection timeout in milliseconds.
* Default: 10000 (10 seconds)
*/
KINESIS_CONNECTION_TIMEOUT_MS("kinesis_connection_timeout_ms", 10000, Integer::parseInt),
/**
* Request timeout in milliseconds.
* Default: 10000 (10 seconds)
*/
KINESIS_REQUEST_TIMEOUT_MS("kinesis_request_timeout_ms", 10000, Integer::parseInt),
/**
* Max number of retry attempts for Kinesis API calls.
* Default: 3
*/
KINESIS_MAX_RETRIES("kinesis_max_retries", 3, Integer::parseInt),
/**
* Use HTTPS for Kinesis endpoint.
* Default: true
* Set to false only for local testing (e.g., LocalStack).
*/
KINESIS_USE_HTTPS("kinesis_use_https", true, Boolean::parseBoolean);
private final String name;
private final Object defaultValue;
private final Function<String, Object> converter;
<T> KinesisConfiguration(String name, T defaultValue, Function<String, T> converter) {
this.name = name;
this.defaultValue = defaultValue;
this.converter = (Function<String, Object>) converter;
}
public String getName() {
return name;
}
public Object getDefaultValue() {
return defaultValue;
}
public static KinesisConfiguration getByName(String name) {
return Arrays.stream(KinesisConfiguration.values())
.filter(config -> config.getName().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown Kinesis configuration: " + name));
}
@SuppressWarnings("unchecked")
public <T> T getParameterValue(String param) {
Object value = param != null ? converter.apply(param) : defaultValue;
return (T) value;
}
}