首页 > 解决方案 > Kafka 消息失败后的延迟

问题描述

我有一个用@Listener 注释的 sping boot Kafka 消费者。这很好用。我想通过将失败的消息发送到另一个主题(DQL)来处理它们。使用 ErrorHandler 仍然很容易。

故障情况有两种:

第一种情况可以通过暂停容器很好地处理。发生反序列化错误时使用的调用传递容器实例。但是......如果投票顺利并且收到了所有 500 条消息,我无法阻止它们被发送给消费者。因此,如果在消息处理过程中出现问题,它将始终使完整的 500 条消息失败。我不知道如何以一种好的方式停止批处理。

有人有建议吗?

我还尝试在出现故障后降低 maxPollRecords ,但这在很大程度上依赖于反射,并且远不是阻止系统充斥故障的好方法。

标签: spring-bootapache-kafka

解决方案


您可以根据您的实施要求以任何一种方式利用 DLQ。

  1. 如果您使用的是 Kafka Connect,我会要求您参考confluent 博客

  2. 如果您使用的是 Kafka Streams,请参阅此kafka 文档

  3. 如果您有自己的生产者和消费者,那么在您的 try-block 中,添加 kafka 消费者逻辑,如果发生任何异常,则将消息发送到“死信队列(DLQ)”主题。如果没有错误,则在转换后将消息发送到“目标主题”。


推荐阅读