apache-kafka - Kafka消费者:如果未提交先前的消息偏移量并且禁用自动提交,则希望再次读取相同的消息
问题描述
我已经关闭了自动提交,并且在阅读后也没有从消费者那里提交偏移量。
检查的消费者滞后也保持不变,它确保偏移量不会被提交。但问题是,它再次消耗下一条消息而不是相同的消息。
我怎样才能一次又一次地阅读相同的消息。只有提交了先前的偏移量,我才应该能够阅读下一条消息。请在这里帮我做这件事。
解决方案
如果您知道您的 kafka 消费者当前正在访问哪个分区,您可以使用kafkaconsumer.seek(partition, offset)
方法告诉您的消费者从特定偏移量读取消息。例子:
//to get the partition from consumer record
val partition: Int = consumerRecord.partition()
//to get offset of current record
val recordOffset = consumerRecord.offset()
if(data processing fail condition)
consumer.seek(new TopicPartition(topic, partition), recordOffset )
//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100)
推荐阅读
- swift - 在 Swift 中从日期字符串转换日期
- r - 创建一个带有字幕的 ggplotly 对象
- php - ngix 显示 404 用于同一目录中的 php 文件
- datatable - PrimeVue 编辑行
- php - 如何在 WordPress 中上传时将 JPG 转换为 PNG(通过 add_image_size)?
- javascript - 为什么这里的这段代码第一次返回一个空白数组
- python - 在 Pandas 中扩展 JSON 列
- kotlin - 通过 lambda 的返回类型区分具有 lambda 参数的函数?
- javascript - 喜欢和不喜欢 Reat Js
- rxjs - 如何在一个承诺中加载一个包含所有子集合的 firebase-collection?