KinesisUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.kinesis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class KinesisUtil {
private static final Logger LOG = LogManager.getLogger(KinesisUtil.class);
/**
* Get all shard IDs for the given Kinesis stream.
* This delegates to a BE node which uses the AWS SDK to call ListShards API.
*
* @param region AWS region of the stream
* @param stream Kinesis stream name
* @param endpoint optional custom endpoint (e.g. LocalStack), empty string for default
* @param convertedCustomProperties AWS credentials and other properties
* @return list of shard IDs (e.g. ["shardId-000000000000", "shardId-000000000001"])
*/
public static List<String> getAllKinesisShards(String region, String stream, String endpoint,
Map<String, String> convertedCustomProperties) throws LoadException {
try {
InternalService.PKinesisLoadInfo.Builder kinesisInfoBuilder =
InternalService.PKinesisLoadInfo.newBuilder()
.setRegion(region)
.setStream(stream)
.addAllProperties(convertedCustomProperties.entrySet().stream()
.map(e -> InternalService.PStringPair.newBuilder()
.setKey(e.getKey())
.setVal(e.getValue())
.build())
.collect(Collectors.toList()));
if (endpoint != null && !endpoint.isEmpty()) {
kinesisInfoBuilder.setEndpoint(endpoint);
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder()
.setKinesisMetaRequest(
InternalService.PKinesisMetaProxyRequest.newBuilder()
.setKinesisInfo(kinesisInfoBuilder))
.setTimeoutSecs(Config.max_get_kafka_meta_timeout_second)
.build();
return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second)
.getKinesisMetaResult().getShardIdsList();
} catch (Exception e) {
throw new LoadException(
"Failed to get shards of Kinesis stream: " + stream + ". error: " + e.getMessage());
}
}
private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request,
int timeout) throws LoadException {
long startTime = System.currentTimeMillis();
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
Set<Long> failedBeIds = new HashSet<>();
TStatusCode code = null;
String errorMsg = null;
try {
while (retryTimes < 3) {
List<Long> backendIds = new ArrayList<>();
for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
if (backend != null && backend.isLoadAvailable()
&& !backend.isDecommissioned()
&& !failedBeIds.contains(beId)
&& !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
backendIds.add(beId);
}
}
if (backendIds.isEmpty()) {
for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
backendIds.add(beId);
}
}
if (backendIds.isEmpty()) {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
if (failedBeIds.isEmpty()) {
errorMsg = "no alive backends";
}
throw new LoadException("failed to get info: " + errorMsg + ",");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
long beId = be.getId();
try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(timeout, TimeUnit.SECONDS);
} catch (Exception e) {
errorMsg = e.getMessage();
LOG.warn("failed to get kinesis info request to {} err {}", address, e.getMessage());
failedBeIds.add(beId);
retryTimes++;
continue;
}
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
errorMsg = result.getStatus().getErrorMsgsList().toString();
LOG.warn("failed to get kinesis info request to {} err {}", address,
result.getStatus().getErrorMsgsList());
failedBeIds.add(beId);
retryTimes++;
} else {
return result;
}
}
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("failed to get kinesis info: " + errorMsg + ",");
} finally {
if (code != null && code == TStatusCode.OK && !failedBeIds.isEmpty()) {
for (Long beId : failedBeIds) {
Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId);
LOG.info("add beId {} to blacklist, blacklist: {}", beId,
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist());
}
}
long endTime = System.currentTimeMillis();
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime);
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
}
}
}