首页 > 解决方案 > Kafka 在重新平衡期间最后生成和提交的偏移量

问题描述

下面的代码片段使用 kafka 管理客户端来检索某个消费者组(即 CONSUMER_GROUP)的所有分区的最后提交和生成的偏移量:

Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for(TopicPartition tp: offsets.keySet()) {
    requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
    String topic = e.getKey().topic();
    int partition = e.getKey().partition();
    long committedOffset = e.getValue().offset();
    long latestOffset = latestOffsets.get(e.getKey()).offset();
    System.out.println("Consumer group " + CONSUMER_GROUP + " has committed offset " + committedOffset + " to topic " + topic + " partition " + partition  + ". The latest offset in the partition is " + latestOffset + " so consumer  group is "+ (latestOffset - committedOffset) + " records behind");
}

您能否就以下几点提示我:

  1. 如果消费者组处于重新平衡状态(或另一个不稳定状态),那会(如何)影响返回的最后一个和提交的偏移量?[假设我手动提交偏移量并使用消费者重新平衡侦听器在重新平衡之前提交最新处理的偏移量]

  2. 鉴于上面的代码,当管理请求由 Kafka 代理提供服务时,是否有任何特定方法可以知道该组是否正在进行重新平衡(或通常处于不稳定状态)?

  3. 如果我有兴趣检索不属于任何消费者组的特定主题分区的最新偏移量,我该如何实现呢?因此,为什么最新的偏移量绑定到特定的消费者组,因为它们是由生产者发送的,而与任何消费者组无关?

谢谢你。

标签: apache-kafkakafka-consumer-api

解决方案


推荐阅读