spring-boot - Kafka 消息失败后的延迟
问题描述
我有一个用@Listener 注释的 sping boot Kafka 消费者。这很好用。我想通过将失败的消息发送到另一个主题(DQL)来处理它们。使用 ErrorHandler 仍然很容易。
故障情况有两种:
- 反序列化问题,因此不调用消费者
- 消费者的例外
第一种情况可以通过暂停容器很好地处理。发生反序列化错误时使用的调用传递容器实例。但是......如果投票顺利并且收到了所有 500 条消息,我无法阻止它们被发送给消费者。因此,如果在消息处理过程中出现问题,它将始终使完整的 500 条消息失败。我不知道如何以一种好的方式停止批处理。
有人有建议吗?
我还尝试在出现故障后降低 maxPollRecords ,但这在很大程度上依赖于反射,并且远不是阻止系统充斥故障的好方法。
解决方案
您可以根据您的实施要求以任何一种方式利用 DLQ。
如果您使用的是 Kafka Connect,我会要求您参考confluent 博客
如果您使用的是 Kafka Streams,请参阅此kafka 文档。
如果您有自己的生产者和消费者,那么在您的 try-block 中,添加 kafka 消费者逻辑,如果发生任何异常,则将消息发送到“死信队列(DLQ)”主题。如果没有错误,则在转换后将消息发送到“目标主题”。
推荐阅读
- mongodb - (如何)聚合()可以破坏索引?
- macos - 奇怪的目录被添加到我的 PATH
- excel - VBA 在 Excel 中的 Word 文档中创建报告
- python - 输出视频与原始视频不同 Open CV
- ios - 仅在滚动时,iFrame 中的延迟加载页面在 iOS 设备上(垂直)被切断
- pandas - Pandas:仅将列中的某些字符串大写
- g1ant - 带有参数相对真或相对假的颜色命令输出?
- android - Android Firebase addOnChildListener 致命异常“打印需要消息”并且没有设置器/字段
- python - Python numpy 数组切片不是 Fortran 连续的
- google-apps-script - 当NumberLessThan 无法引用单元格时,Google Apps 脚本条件格式