KinesisConfiguration.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload.kinesis;
import com.google.common.base.Splitter;
import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Configuration enum for AWS Kinesis data source properties.
*
* Parameters are divided into two categories:
* 1. AWS Client parameters (aws.*): region, endpoint, credentials, timeouts
* 2. Kinesis-specific parameters (aws.kinesis.*): stream, shards, positions, API settings
*/
public enum KinesisConfiguration {
/**
* AWS region where the Kinesis stream is located.
* Required property.
*/
KINESIS_REGION("aws.region", null, value -> value.trim()),
/**
* Optional custom endpoint URL for Kinesis service.
* Useful for LocalStack or VPC endpoints.
*/
KINESIS_ENDPOINT("aws.endpoint", null, value -> value.trim()),
/**
* Name of the Kinesis stream to consume from.
* Required property.
*/
KINESIS_STREAM("kinesis_stream", null, value -> value.trim()),
/**
* Comma-separated list of shard IDs to consume from.
* If not specified, all shards will be consumed.
*/
KINESIS_SHARDS("kinesis_shards", null, shardsString ->
Arrays.stream(shardsString.replace(" ", "").split(","))
.collect(Collectors.toList())),
/**
* Shard iterator positions (sequence numbers) for each shard.
* Format: position1,position2,... corresponding to shards order.
*/
KINESIS_POSITIONS("kinesis_shards_pos", null,
positionsString -> Splitter.on(",").trimResults().splitToList(positionsString)),
/**
* Default starting position for new shards.
* Valid values: TRIM_HORIZON, LATEST, AT_TIMESTAMP
*/
KINESIS_DEFAULT_POSITION("property.kinesis_default_pos", "LATEST", position -> position.trim()),
/**
* AWS Access Key ID for authentication.
*/
KINESIS_ACCESS_KEY("aws.access_key", null, value -> value),
/**
* AWS Secret Access Key for authentication.
*/
KINESIS_SECRET_KEY("aws.secret_key", null, value -> value),
/**
* AWS Session Token for temporary credentials.
*/
KINESIS_SESSION_TOKEN("aws.session_key", null, value -> value),
/**
* IAM Role ARN to assume for accessing Kinesis.
*/
KINESIS_ROLE_ARN("aws.role_arn", null, value -> value.trim()),
/**
* External ID for IAM role assumption.
*/
KINESIS_EXTERNAL_ID("aws.external.id", null, value -> value.trim()),
/**
* AWS Profile name to use from credentials file.
*/
KINESIS_PROFILE_NAME("aws.profile.name", null, value -> value.trim()),
/**
* Consumer name for enhanced fan-out (EFO).
*/
KINESIS_CONSUMER_NAME("aws.kinesis.consumer.name", null, value -> value.trim()),
/**
* Maximum records per GetRecords call.
*/
KINESIS_MAX_RECORDS_PER_FETCH("aws.kinesis.max_records", 10000, Integer::parseInt),
/**
* Interval between GetRecords calls in milliseconds.
*/
KINESIS_FETCH_INTERVAL_MS("aws.kinesis.fetch_interval_ms", 200, Integer::parseInt),
/**
* Connection timeout in milliseconds.
*/
KINESIS_CONNECTION_TIMEOUT_MS("aws.connection.timeout.ms", 10000, Integer::parseInt),
/**
* Request timeout in milliseconds.
*/
KINESIS_REQUEST_TIMEOUT_MS("aws.request.timeout.ms", 10000, Integer::parseInt),
/**
* Max number of retry attempts for Kinesis API calls.
*/
KINESIS_MAX_RETRIES("aws.kinesis.max_retries", 3, Integer::parseInt),
/**
* Use HTTPS for Kinesis endpoint.
*/
KINESIS_USE_HTTPS("aws.kinesis.use_https", true, Boolean::parseBoolean);
private final String name;
private final Object defaultValue;
private final Function<String, Object> converter;
<T> KinesisConfiguration(String name, T defaultValue, Function<String, T> converter) {
this.name = name;
this.defaultValue = defaultValue;
this.converter = (Function<String, Object>) converter;
}
public String getName() {
return name;
}
public Object getDefaultValue() {
return defaultValue;
}
public static KinesisConfiguration getByName(String name) {
return Arrays.stream(KinesisConfiguration.values())
.filter(config -> config.getName().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown Kinesis configuration: " + name));
}
@SuppressWarnings("unchecked")
public <T> T getParameterValue(String param) {
Object value = param != null ? converter.apply(param) : defaultValue;
return (T) value;
}
}