首页 > 解决方案 > Consumer.endOffsets 如何在 Kafka 中工作?

问题描述

假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交的偏移量和结束偏移量。与 Kafka 控制台消费者组脚本的工作方式类似,但它适用于所有组。

就像是

单一消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

多个消费者 - 工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

版本:Kafka-Client 2.0.0

我是否错误地使用了消费者 API?理想情况下,我想使用单一消费者。

如果您需要更多详细信息,请告诉我。

标签: javaapache-kafkakafka-consumer-apispring-kafka

解决方案


我想你快到了。首先收集所有你感兴趣的主题分区,然后发出consumer.endOffsets命令。

请记住,我没有尝试运行它,但是这样的事情应该可以工作:

run() { 
   Consumer consumer = createConsumer();
   List<String> groupIds = getConsumerGroups();
   List<TopicPartition> topicPartitions = new ArrayList<>();

   for (String groupId: groupIds) {
        topicPartitions.addAll(getTopicPartitions(groupId));
   }

   consumer.endOffsets(topicPartitions); 
}

推荐阅读