首页 > 解决方案 > Kafka消费者:如果未提交先前的消息偏移量并且禁用自动提交,则希望再次读取相同的消息

问题描述

我已经关闭了自动提交,并且在阅读后也没有从消费者那里提交偏移量。

检查的消费者滞后也保持不变,它确保偏移量不会被提交。但问题是,它再次消耗下一条消息而不是相同的消息。

我怎样才能一次又一次地阅读相同的消息。只有提交了先前的偏移量,我才应该能够阅读下一条消息。请在这里帮我做这件事。

标签: apache-kafkakafka-consumer-api

解决方案


如果您知道您的 kafka 消费者当前正在访问哪个分区,您可以使用kafkaconsumer.seek(partition, offset)方法告诉您的消费者从特定偏移量读取消息。例子:

//to get the partition from consumer record
val partition: Int = consumerRecord.partition() 

//to get offset of current record
val recordOffset = consumerRecord.offset() 

if(data processing fail condition)
  consumer.seek(new TopicPartition(topic, partition), recordOffset )

//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100) 

推荐阅读