首页 > 解决方案 > 重试消费来自 Kafka 主题的消息

问题描述

我正在开发一个模块,它使用来自 Kafka 主题的消息并发布到下游系统。如果下游系统不可用,消费者不确认 Kakfa 消息。因此,当我的消费者在下游系统不可用时收到消息时,不会提交 kakfa 的偏移量。但是,如果我在下游系统启动后收到新消息并且当我确认该消息时,将提交最新的偏移量,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

即假设我的消费者被消耗到偏移量4。当下游不可用时,消费者会收到两条消息,因此我的消费者没有提交偏移量。所以 toipc 中的消息数现在是 6,但偏移量仍然是 4。现在下游系统可用,消费者收到一条新消息(第 7 条消息)。由于下游没有问题,消费者确认第 7 条消息,主题的偏移量将设置为 7。

有什么方法可以让我的消费者在收到第 7 条消息之前收到第 5 条和第 6 条消息?我在实现中使用了spring cloud stream。

标签: apache-kafkakafka-consumer-apispring-cloud-stream

解决方案


看到这个答案

您需要 aSeekToCurrentErrorHandler并引发异常,以便重置偏移量。


推荐阅读