KafkaProgress.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.routineload;

import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKafkaRLTaskProgress;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;

/**
 * this is description of kafka routine load progress
 * the data before offset was already loaded in Doris
 */
// {"partitionIdToOffset": {}}
public class KafkaProgress extends RoutineLoadProgress {
    private static final Logger LOG = LogManager.getLogger(KafkaProgress.class);

    public static final String OFFSET_BEGINNING = "OFFSET_BEGINNING"; // -2
    public static final String OFFSET_END = "OFFSET_END"; // -1
    // OFFSET_ZERO is just for show info, if user specified offset is 0
    public static final String OFFSET_ZERO = "OFFSET_ZERO";

    public static final long OFFSET_BEGINNING_VAL = -2;
    public static final long OFFSET_END_VAL = -1;

    // (partition id, begin offset)
    // the offset saved here is the next offset need to be consumed
    @SerializedName(value = "pito")
    private ConcurrentMap<Integer, Long> partitionIdToOffset = Maps.newConcurrentMap();

    private ReentrantLock lock = new ReentrantLock(true);

    public KafkaProgress() {
        super(LoadDataSourceType.KAFKA);
    }

    public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) {
        super(LoadDataSourceType.KAFKA);
        this.partitionIdToOffset = new ConcurrentHashMap<>();
        partitionIdToOffset.putAll(tKafkaRLTaskProgress.getPartitionCmtOffset());
    }

    public KafkaProgress(Map<Integer, Long> partitionIdToOffset) {
        super(LoadDataSourceType.KAFKA);
        this.partitionIdToOffset = new ConcurrentHashMap<>();
        this.partitionIdToOffset.putAll(partitionIdToOffset);
    }

    public KafkaProgress(ConcurrentMap<Integer, Long> partitionIdToOffset) {
        super(LoadDataSourceType.KAFKA);
        this.partitionIdToOffset = partitionIdToOffset;
    }

    public ConcurrentMap<Integer, Long> getPartitionIdToOffset(List<Integer> partitionIds) {
        ConcurrentMap<Integer, Long> result = Maps.newConcurrentMap();
        for (ConcurrentMap.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
            for (Integer partitionId : partitionIds) {
                if (entry.getKey().equals(partitionId)) {
                    result.put(partitionId, entry.getValue());
                }
            }
        }
        return result;
    }

    public void addPartitionOffset(Pair<Integer, Long> partitionOffset) {
        partitionIdToOffset.put(partitionOffset.first, partitionOffset.second);
    }

    public Long getOffsetByPartition(int kafkaPartition) {
        return partitionIdToOffset.get(kafkaPartition);
    }

    public ConcurrentMap<Integer, Long> getOffsetByPartition() {
        return partitionIdToOffset;
    }

    public boolean containsPartition(Integer kafkaPartition) {
        return partitionIdToOffset.containsKey(kafkaPartition);
    }

    public boolean hasPartition() {
        return !partitionIdToOffset.isEmpty();
    }

    // (partition id, end offset)
    // OFFSET_ZERO: user set offset == 0, no committed msg
    // OFFSET_END: user set offset = OFFSET_END, no committed msg
    // OFFSET_BEGINNING: user set offset = OFFSET_BEGINNING, no committed msg
    // other: current committed msg's offset
    private void getReadableProgress(ConcurrentMap<Integer, String> showPartitionIdToOffset) {
        for (ConcurrentMap.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
            if (entry.getValue() == 0) {
                showPartitionIdToOffset.put(entry.getKey(), OFFSET_ZERO);
            } else if (entry.getValue() == -1) {
                showPartitionIdToOffset.put(entry.getKey(), OFFSET_END);
            } else if (entry.getValue() == -2) {
                showPartitionIdToOffset.put(entry.getKey(), OFFSET_BEGINNING);
            } else {
                // The offset saved in partitionIdToOffset is the next offset to be consumed.
                // So here we minus 1 to return the "already consumed" offset.
                showPartitionIdToOffset.put(entry.getKey(), "" + (entry.getValue() - 1));
            }
        }
    }

    public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
        for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
            if (!partitionIdToOffset.containsKey(pair.first)) {
                throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
            }
        }
    }

    // modify the partition offset of this progress.
    // throw exception is the specified partition does not exist in progress.
    public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
        for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
            partitionIdToOffset.put(pair.first, pair.second);
        }
    }

    public List<Pair<Integer, String>> getPartitionOffsetPairs(boolean alreadyConsumed) {
        List<Pair<Integer, String>> pairs = Lists.newArrayList();
        for (ConcurrentMap.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
            if (entry.getValue() == 0) {
                pairs.add(Pair.of(entry.getKey(), OFFSET_ZERO));
            } else if (entry.getValue() == -1) {
                pairs.add(Pair.of(entry.getKey(), OFFSET_END));
            } else if (entry.getValue() == -2) {
                pairs.add(Pair.of(entry.getKey(), OFFSET_BEGINNING));
            } else {
                long offset = entry.getValue();
                if (alreadyConsumed) {
                    offset -= 1;
                }
                pairs.add(Pair.of(entry.getKey(), "" + offset));
            }
        }
        return pairs;
    }

    // Get the lag of each kafka partition.
    // the `partitionIdWithLatestOffsets` is the cached latest offsets of each partition,
    // which is periodically updated as job is running.
    // The latest offset saved in `partitionIdWithLatestOffsets` is the next offset of the partition,
    // And offset saved in `partitionIdToOffset` is the next offset to be consumed.
    // For example, if a partition has 4 msg with offsets: 0,1,2,3
    // The latest offset is 4, and offset to be consumed is 2,
    // so the lag should be (4-2=)2.
    public Map<Integer, Long> getLag(Map<Integer, Long> partitionIdWithLatestOffsets) {
        Map<Integer, Long> lagMap = Maps.newHashMap();
        for (ConcurrentMap.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
            if (partitionIdWithLatestOffsets.containsKey(entry.getKey())) {
                long lag = partitionIdWithLatestOffsets.get(entry.getKey()) - entry.getValue();
                lagMap.put(entry.getKey(), lag);
            } else {
                lagMap.put(entry.getKey(), -1L);
            }
        }
        return lagMap;
    }

    @Override
    public String toString() {
        ConcurrentMap<Integer, String> showPartitionIdToOffset = Maps.newConcurrentMap();
        getReadableProgress(showPartitionIdToOffset);
        return "KafkaProgress [partitionIdToOffset="
                + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]";
    }

    @Override
    public String toJsonString() {
        ConcurrentMap<Integer, String> showPartitionIdToOffset = Maps.newConcurrentMap();
        getReadableProgress(showPartitionIdToOffset);
        Gson gson = new Gson();
        return gson.toJson(showPartitionIdToOffset);
    }

    @Override
    public void update(RLTaskTxnCommitAttachment attachment) {
        KafkaProgress newProgress = (KafkaProgress) attachment.getProgress();

        // + 1 to point to the next msg offset to be consumed
        if (Config.isCloudMode()) {
            lock.lock();
            try {
                newProgress.partitionIdToOffset.forEach((partitionId, newOffset) -> {
                    this.partitionIdToOffset.compute(partitionId, (key, oldOffset) -> {
                        return (oldOffset == null || newOffset + 1 > oldOffset) ? newOffset + 1 : oldOffset;
                    });
                });
            } finally {
                lock.unlock();
            }
        } else {
            newProgress.partitionIdToOffset.entrySet().stream()
                    .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1));
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("update kafka progress: {}, task: {}, job: {}",
                    newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());
        }
    }

    @Deprecated
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        int size = in.readInt();
        partitionIdToOffset = new ConcurrentHashMap<>();
        for (int i = 0; i < size; i++) {
            partitionIdToOffset.put(in.readInt(), in.readLong());
        }
    }

    public Long totalProgress() {
        return partitionIdToOffset.values().stream().reduce(0L, Long::sum);
    }
}