KafkaDataSourceProperties.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload.kafka;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.KafkaProgress;
import org.apache.doris.load.routineload.LoadDataSourceType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.regex.Pattern;
/**
* Kafka data source properties
*/
public class KafkaDataSourceProperties extends AbstractDataSourceProperties {
private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
private static final String CUSTOM_KAFKA_PROPERTY_PREFIX = "property.";
@Getter
@Setter
@SerializedName(value = "kafkaPartitionOffsets")
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
@Getter
@SerializedName(value = "customKafkaProperties")
private Map<String, String> customKafkaProperties;
@Getter
@SerializedName(value = "isOffsetsForTimes")
private boolean isOffsetsForTimes = false;
@Getter
@SerializedName(value = "brokerList")
private String brokerList;
@Getter
@SerializedName(value = "topic")
private String topic;
/**
* The table name properties of kafka data source
* <p>
* table_name_location: table name location
* 1. table name location is in the key of kafka message
* 2. table name location is in the value of kafka message
* <p>
* table_name_format: table name format
* 1.json format
* 2.txt format
* <p>
* table_name_regex: table name regex
*/
@Getter
@SerializedName(value = "tableNameProperties")
private Map<String, String> tableNameProperties;
private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET =
new ImmutableSet.Builder<String>().add(KafkaConfiguration.KAFKA_BROKER_LIST.getName())
.add(KafkaConfiguration.KAFKA_TOPIC.getName())
.add(KafkaConfiguration.KAFKA_PARTITIONS.getName())
.add(KafkaConfiguration.KAFKA_OFFSETS.getName())
.add(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName())
.add(KafkaConfiguration.KAFKA_TABLE_NAME_LOCATION.getName())
.add(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName())
.add(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName())
.add(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName())
.build();
public KafkaDataSourceProperties(Map<String, String> dataSourceProperties, boolean multiLoad) {
super(dataSourceProperties, multiLoad);
}
public KafkaDataSourceProperties(Map<String, String> originalDataSourceProperties) {
super(originalDataSourceProperties);
}
@Override
protected String getDataSourceType() {
return LoadDataSourceType.KAFKA.name();
}
@Override
protected List<String> getRequiredProperties() {
return Arrays.asList(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), KafkaConfiguration.KAFKA_TOPIC.getName());
}
@Override
public void convertAndCheckDataSourceProperties() throws UserException {
Optional<String> optional = originalDataSourceProperties.keySet()
.stream().filter(entity -> !CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET.contains(entity))
.filter(entity -> !entity.startsWith(CUSTOM_KAFKA_PROPERTY_PREFIX)).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid kafka property or can not be set");
}
this.brokerList = KafkaConfiguration.KAFKA_BROKER_LIST.getParameterValue(originalDataSourceProperties
.get(KafkaConfiguration.KAFKA_BROKER_LIST.getName()));
if (!isAlter() && StringUtils.isBlank(brokerList)) {
throw new AnalysisException(KafkaConfiguration.KAFKA_BROKER_LIST.getName() + " is a required property");
}
//check broker list
if (StringUtils.isNotBlank(brokerList)) {
for (String broker : brokerList.split(",")) {
if (!Pattern.matches(ENDPOINT_REGEX, broker)) {
throw new AnalysisException(KafkaConfiguration.KAFKA_BROKER_LIST
+ ":" + broker + " not match pattern " + ENDPOINT_REGEX);
}
}
}
//check topic
this.topic = KafkaConfiguration.KAFKA_TOPIC.getParameterValue(originalDataSourceProperties
.get(KafkaConfiguration.KAFKA_TOPIC.getName()));
if (!isAlter() && StringUtils.isBlank(topic)) {
throw new AnalysisException(KafkaConfiguration.KAFKA_TOPIC.getName() + " is a required property");
}
// check custom kafka property
// This should be done before check partition and offsets, because we need KAFKA_DEFAULT_OFFSETS,
// which is in custom properties.
analyzeCustomProperties();
List<Integer> partitions = KafkaConfiguration.KAFKA_PARTITIONS.getParameterValue(originalDataSourceProperties
.get(KafkaConfiguration.KAFKA_PARTITIONS.getName()));
if (CollectionUtils.isNotEmpty(partitions)) {
analyzeKafkaPartitionProperty(partitions);
}
//check offset
List<String> offsets = KafkaConfiguration.KAFKA_OFFSETS.getParameterValue(originalDataSourceProperties
.get(KafkaConfiguration.KAFKA_OFFSETS.getName()));
String defaultOffsetString = originalDataSourceProperties
.get(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName());
if (CollectionUtils.isNotEmpty(offsets) && StringUtils.isNotBlank(defaultOffsetString)) {
throw new AnalysisException("Only one of " + KafkaConfiguration.KAFKA_OFFSETS.getName() + " and "
+ KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName() + " can be set.");
}
if (multiTable) {
checkAndSetMultiLoadProperties();
}
if (isAlter() && CollectionUtils.isNotEmpty(partitions) && CollectionUtils.isEmpty(offsets)
&& StringUtils.isBlank(defaultOffsetString)) {
// if this is an alter operation, the partition and (default)offset must be set together.
throw new AnalysisException("Must set offset or default offset with partition property");
}
if (CollectionUtils.isNotEmpty(offsets)) {
this.isOffsetsForTimes = analyzeKafkaOffsetProperty(offsets);
return;
}
this.isOffsetsForTimes = analyzeKafkaDefaultOffsetProperty();
if (CollectionUtils.isNotEmpty(kafkaPartitionOffsets)) {
defaultOffsetString = customKafkaProperties.get(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName());
setDefaultOffsetForPartition(this.kafkaPartitionOffsets, defaultOffsetString, this.isOffsetsForTimes);
}
}
private void checkAndSetMultiLoadProperties() throws AnalysisException {
String tableNameFormat = KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getParameterValue(
originalDataSourceProperties.get(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName()));
if (!KafkaConfigType.TableNameFormat.TEXT.name().equalsIgnoreCase(tableNameFormat)) {
throw new AnalysisException("Multi load olay supported for table name format TEXT");
}
String tableNameDelimiter = KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getParameterValue(
originalDataSourceProperties.get(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName()));
Integer tableNameIndex = KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getParameterValue(
originalDataSourceProperties.get(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName()));
tableNameProperties = new HashMap<>();
tableNameProperties.put(KafkaConfiguration.KAFKA_TABLE_NAME_FORMAT.getName(), tableNameFormat);
tableNameProperties.put(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_DELIMITER.getName(), tableNameDelimiter);
tableNameProperties.put(KafkaConfiguration.KAFKA_TEXT_TABLE_NAME_FIELD_INDEX.getName(),
String.valueOf(tableNameIndex));
}
private static void setDefaultOffsetForPartition(List<Pair<Integer, Long>> kafkaPartitionOffsets,
String kafkaDefaultOffsetString, boolean isOffsetsForTimes) {
if (isOffsetsForTimes) {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
pair.second = Long.valueOf(kafkaDefaultOffsetString);
}
} else {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
pair.second = KafkaProgress.OFFSET_BEGINNING_VAL;
} else {
pair.second = KafkaProgress.OFFSET_END_VAL;
}
}
}
}
// init "kafkaPartitionOffsets" with partition property.
// The offset will be set to OFFSET_END for now, and will be changed in later analysis process.
private void analyzeKafkaPartitionProperty(List<Integer> partitions) {
partitions.forEach(partition -> this.kafkaPartitionOffsets
.add(Pair.of(partition, KafkaProgress.OFFSET_END_VAL)));
}
private void analyzeCustomProperties() throws AnalysisException {
this.customKafkaProperties = new HashMap<>();
for (Map.Entry<String, String> dataSourceProperty : originalDataSourceProperties.entrySet()) {
if (dataSourceProperty.getKey().startsWith(CUSTOM_KAFKA_PROPERTY_PREFIX)) {
String propertyKey = dataSourceProperty.getKey();
String propertyValue = dataSourceProperty.getValue();
String[] propertyValueArr = propertyKey.split("\\.");
if (propertyValueArr.length < 2) {
throw new AnalysisException("kafka property value could not be a empty string");
}
this.customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue);
}
// can be extended in the future which other prefix
}
}
// Fill the partition's offset with given kafkaOffsetsString,
// Return true if offset is specified by timestamp.
private boolean analyzeKafkaOffsetProperty(List<String> kafkaOffsetsStringList) throws UserException {
if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) {
throw new AnalysisException("Partitions number should be equals to offsets number");
}
// We support two ways to specify the offset,
// one is to specify the offset directly, the other is to specify a timestamp.
// Doris will get the offset of the corresponding partition through the timestamp.
// The user can only choose one of these methods.
boolean foundTime = false;
boolean foundOffset = false;
for (String kafkaOffsetsStr : kafkaOffsetsStringList) {
if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) {
foundTime = true;
} else {
foundOffset = true;
}
}
if (foundTime && foundOffset) {
throw new AnalysisException("The offset of the partition cannot be specified by the timestamp "
+ "and the offset at the same time");
}
if (foundTime) {
// convert all datetime strs to timestamps
// and set them as the partition's offset.
// These timestamps will be converted to real offset when job is running.
TimeZone timeZone = TimeUtils.getOrSystemTimeZone(getTimezone());
for (int i = 0; i < kafkaOffsetsStringList.size(); i++) {
String kafkaOffsetsStr = kafkaOffsetsStringList.get(i);
long timestamp = TimeUtils.timeStringToLong(kafkaOffsetsStr, timeZone);
Preconditions.checkState(timestamp != -1);
kafkaPartitionOffsets.get(i).second = timestamp;
}
} else {
for (int i = 0; i < kafkaOffsetsStringList.size(); i++) {
String kafkaOffsetsStr = kafkaOffsetsStringList.get(i);
if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL;
} else if (NumberUtils.isDigits(kafkaOffsetsStr)) {
kafkaPartitionOffsets.get(i).second = NumberUtils.toLong(kafkaOffsetsStr);
} else {
throw new AnalysisException(KafkaConfiguration.KAFKA_OFFSETS.getName()
+ " must be an integer or a date time");
}
}
}
return foundTime;
}
// If the default offset is not set, set the default offset to OFFSET_END.
// If the offset is in datetime format, convert it to a timestamp,
// and also save the origin datatime formatted offset
// in "customKafkaProperties"
// return true if the offset is in datetime format.
private boolean analyzeKafkaDefaultOffsetProperty() throws AnalysisException {
customKafkaProperties.putIfAbsent(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName(), KafkaProgress.OFFSET_END);
String defaultOffsetStr = customKafkaProperties.get(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName());
TimeZone timeZone = TimeUtils.getOrSystemTimeZone(this.getTimezone());
long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr, timeZone);
if (defaultOffset != -1) {
// this is a datetime format offset
customKafkaProperties.put(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName(),
String.valueOf(defaultOffset));
// we convert datetime to timestamp, and save the origin datetime formatted offset for further use.
customKafkaProperties.put(KafkaConfiguration.KAFKA_ORIGIN_DEFAULT_OFFSETS.getName(), defaultOffsetStr);
return true;
} else {
if (!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)
&& !defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
throw new AnalysisException(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()
+ " can only be set to OFFSET_BEGINNING, OFFSET_END or date time");
}
return false;
}
}
}