apache-kafka - 如何不在 KafkaListener 中提交偏移量
问题描述
我正在使用 KafkaListener 并希望根据消息是否成功处理来控制 Offsets 的提交。为此,我使用下面的代码,但它在 nack(index, sleep) 上引发异常:
@KafkaListener(topics="topic-name", groupId="group"){
someMethod(@Header(KafkaHeaders.OFFSET) int offset, Acknowlegement ack, String message){
try{
processMessage(message);
ack.acknowledge();
}catch(Exception e){
ack.nack(offset, 1000);
}
}
我已将 ackMode 设置为 Manual 并且 Auto Commit 属性设置为 false。消息处理失败时 nack 抛出异常:
Exception: nack(index, sleep) is not supported by this Acknowledgement.
也欢迎任何其他处理该场景的方式。
解决方案
nack(index, sleep)
用于批处理侦听器List<String> messages
。索引是告诉容器列表中的哪条消息失败了。
对于记录侦听器使用nack(sleep)
- 容器已经知道哪个记录失败。
请参阅文档。
从 2.3 版开始,Acknowledgement 接口有两个额外的方法
nack(long sleep)
和nack(int index, long sleep)
. 第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发IllegalStateException
.使用记录侦听器,当调用 nack() 时,将提交任何未决的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次轮询时重新传递失败的记录和未处理的记录( )。通过设置 sleep 参数,消费者线程可以在重新交付之前暂停。这与在容器配置了 SeekToCurrentErrorHandler 时引发异常的功能类似。
使用批处理侦听器时,您可以指定批处理中发生故障的索引。当调用 nack() 时,将为索引之前的记录提交偏移量,并在分区上执行失败和丢弃记录的搜索,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,后者只能寻找整个批次进行重新投递。
推荐阅读
- angular - Service Worker 不适用于 Azure 静态 Web 应用上的自定义域
- list - 谷歌应用脚本中的比较列表
- jsp - JSTL 语句出现在生成的 HTML 中的错误位置和格式
- date - Drupal - 从“日期”字段中获取“年份”值
- statistics - 最小二乘 (LS) 和普通最小二乘 (OLS) 的区别
- python-3.x - 在具有匹配键的字典集中查找条目
- python-3.x - 使用 Beautiful Soup 从 Wikipedia 表创建 CSV 文件
- wordpress - 将永久链接重定向到 especif wordpress 模板
- python - 用于排除某些单词,同时匹配其他单词的正则表达式
- node.js - 反应:从节点 js 后端上传的图像未呈现