KafkaUtil.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.kafka;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class KafkaUtil {
    private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);

    public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
            Map<String, String> convertedCustomProperties) throws UserException {
        try {
            InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                    InternalService.PKafkaMetaProxyRequest.newBuilder()
                            .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
                                    .setBrokers(brokerList)
                                    .setTopic(topic)
                                    .addAllProperties(convertedCustomProperties.entrySet().stream()
                                            .map(e -> InternalService.PStringPair.newBuilder().setKey(e.getKey())
                                                    .setVal(e.getValue()).build()).collect(Collectors.toList())
                                    )
                            )
            ).build();
            return getInfoRequest(request, Config.max_get_kafka_meta_timeout_second)
                    .getKafkaMetaResult().getPartitionIdsList();
        } catch (Exception e) {
            throw new LoadException(
                    "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage());
        }
    }

    // Get offsets by times.
    // The input parameter "timestampOffsets" is <partition, timestamp>
    // Tne return value is <partition, offset>
    public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
            Map<String, String> convertedCustomProperties, List<Pair<Integer, Long>> timestampOffsets)
            throws LoadException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
        }
        try {
            InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                    InternalService.PKafkaMetaProxyRequest.newBuilder()
                            .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
                                    .setBrokers(brokerList)
                                    .setTopic(topic)
                                    .addAllProperties(
                                            convertedCustomProperties.entrySet().stream().map(
                                                    e -> InternalService.PStringPair.newBuilder()
                                                            .setKey(e.getKey())
                                                            .setVal(e.getValue())
                                                            .build()
                                            ).collect(Collectors.toList())
                                    )
                            );
            for (Pair<Integer, Long> pair : timestampOffsets) {
                metaRequestBuilder.addOffsetTimes(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
                        .setVal(pair.second).build());
            }

            InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                    metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
            InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);

            List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
            for (InternalService.PIntegerPair pair : pairs) {
                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
            }
            return partitionOffsets;
        } catch (Exception e) {
            LOG.warn("failed to get offsets for times.", e);
            throw new LoadException(
                    "Failed to get offsets for times of kafka topic: " + topic + ". error: " + e.getMessage());
        }
    }

    public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
                                                             Map<String, String> convertedCustomProperties,
                                                             List<Integer> partitionIds) throws LoadException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
                    partitionIds, topic, taskId, jobId);
        }
        try {
            InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                    InternalService.PKafkaMetaProxyRequest.newBuilder()
                            .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
                                    .setBrokers(brokerList)
                                    .setTopic(topic)
                                    .addAllProperties(
                                            convertedCustomProperties.entrySet().stream().map(
                                                    e -> InternalService.PStringPair.newBuilder()
                                                            .setKey(e.getKey())
                                                            .setVal(e.getValue())
                                                            .build()
                                            ).collect(Collectors.toList())
                                    )
                            );
            for (Integer partitionId : partitionIds) {
                metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
            }
            InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                    metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
            InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);

            List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
            for (InternalService.PIntegerPair pair : pairs) {
                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
                        partitionOffsets, topic, taskId, jobId);
            }
            return partitionOffsets;
        } catch (Exception e) {
            LOG.warn("failed to get latest offsets.", e);
            throw new LoadException(
                    "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage());
        }
    }

    public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String topic,
                                                             Map<String, String> convertedCustomProperties,
                                                             List<Pair<Integer, Long>> offsets)
                                                             throws LoadException {
        // filter values greater than 0 as these offsets is real offset
        // only update offset like OFFSET_BEGINNING or OFFSET_END
        List<Pair<Integer, Long>> offsetFlags = new ArrayList<>();
        List<Pair<Integer, Long>> realOffsets = new ArrayList<>();
        for (Pair<Integer, Long> pair : offsets) {
            if (pair.second < 0) {
                offsetFlags.add(pair);
            } else {
                realOffsets.add(pair);
            }
        }
        if (offsetFlags.size() == 0) {
            LOG.info("do not need update and directly return offsets for partitions {} in topic: {}", offsets, topic);
            return offsets;
        }

        try {
            InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
                    InternalService.PKafkaMetaProxyRequest.newBuilder()
                            .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
                                    .setBrokers(brokerList)
                                    .setTopic(topic)
                                    .addAllProperties(
                                            convertedCustomProperties.entrySet().stream().map(
                                                    e -> InternalService.PStringPair.newBuilder()
                                                            .setKey(e.getKey())
                                                            .setVal(e.getValue())
                                                            .build()
                                            ).collect(Collectors.toList())
                                    )
                            );
            for (Pair<Integer, Long> pair : offsetFlags) {
                metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first)
                        .setVal(pair.second).build());
            }
            InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
                    metaRequestBuilder).setTimeoutSecs(Config.max_get_kafka_meta_timeout_second).build();
            InternalService.PProxyResult result = getInfoRequest(request, Config.max_get_kafka_meta_timeout_second);

            List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
            List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
            for (InternalService.PIntegerPair pair : pairs) {
                partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
            }
            realOffsets.addAll(partitionOffsets);
            LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
            return realOffsets;
        } catch (Exception e) {
            LOG.warn("failed to get real offsets.", e);
            throw new LoadException(
                    "Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage());
        }
    }

    private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
                                                        throws LoadException {
        long startTime = System.currentTimeMillis();
        int retryTimes = 0;
        TNetworkAddress address = null;
        Future<InternalService.PProxyResult> future = null;
        InternalService.PProxyResult result = null;
        Set<Long> failedBeIds = new HashSet<>();
        TStatusCode code = null;

        try {
            while (retryTimes < 3) {
                List<Long> backendIds = new ArrayList<>();
                for (Long beId : Env.getCurrentSystemInfo().getAllBackendIds(true)) {
                    Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
                    if (backend != null && backend.isLoadAvailable()
                            && !backend.isDecommissioned()
                            && !failedBeIds.contains(beId)
                            && !Env.getCurrentEnv().getRoutineLoadManager().isInBlacklist(beId)) {
                        backendIds.add(beId);
                    }
                }
                // If there are no available backends, utilize the blacklist.
                // Special scenarios include:
                // 1. A specific job that connects to Kafka may time out for topic config or network error,
                //    leaving only one backend operational.
                // 2. If that sole backend is decommissioned, the aliveBackends list becomes empty.
                // Hence, in such cases, it's essential to rely on the blacklist to obtain meta information.
                if (backendIds.isEmpty()) {
                    for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
                        backendIds.add(beId);
                    }
                }
                if (backendIds.isEmpty()) {
                    MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
                    throw new LoadException("Failed to get info. No alive backends");
                }
                Collections.shuffle(backendIds);
                Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
                address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
                long beId = be.getId();

                try {
                    future = BackendServiceProxy.getInstance().getInfo(address, request);
                    result = future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
                } catch (Exception e) {
                    LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
                    failedBeIds.add(beId);
                    retryTimes++;
                    continue;
                }
                code = TStatusCode.findByValue(result.getStatus().getStatusCode());
                if (code != TStatusCode.OK) {
                    LOG.warn("failed to get info request to "
                            + address + " err " + result.getStatus().getErrorMsgsList());
                    failedBeIds.add(beId);
                    retryTimes++;
                } else {
                    return result;
                }
            }

            MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
            throw new LoadException("Failed to get info");
        } finally {
            // Ensure that not all BE added to the blacklist.
            // For single request:
            //     Only when the final success is achieved, the failed BE will be added to the blacklist,
            //     ensuring that there are always BE nodes that are not on the blacklist.
            // For multiple requests:
            //     If there is only one BE left without being blacklisted after multiple jitters,
            //     even if this BE fails, it will not be blacklisted.
            if (code != null && code == TStatusCode.OK && !failedBeIds.isEmpty()) {
                for (Long beId : failedBeIds) {
                    Env.getCurrentEnv().getRoutineLoadManager().addToBlacklist(beId);
                    LOG.info("add beId {} to blacklist, blacklist: {}", beId,
                            Env.getCurrentEnv().getRoutineLoadManager().getBlacklist());
                }
            }
            long endTime = System.currentTimeMillis();
            MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_LANTENCY.increase(endTime - startTime);
            MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_COUNT.increase(1L);
        }
    }
}