首页 > 解决方案 > Kafka消费者分区在暂停和恢复时重新处理未提交的消息

问题描述

我正在寻找在 kafka 消费者暂停和恢复后重新处理未提交消息的选项。有人可以建议我使用最好的方法吗?选择暂停/恢复而不是停止和重新启动使用者,以避免重新平衡作为应用程序中错误处理的一部分。

明智地重新平衡它可以正常工作,正如我们预期的那样,暂停/恢复容器,但唯一的问题是它不会在恢复后重新处理未提交的消息。有什么方法可以重新处理那些未提交的消息?

停止和重新启动面临的另一个问题是,一旦在错误情况下停止后启动消费者,消费者就能够拾取未提交的消息并重试它,而不会像预期的那样增加偏移量。如果我发布任何其他可以成功处理的消息,则此过程仍在继续,然后在它没有拾取上一个失败/未提交的消息之后,它正在从轮询中的下一个偏移量中查找消息。我们如何处理这种情况?

如果寻找当前偏移量是一种选择,那么有人可以提供任何工作示例,在这种情况下寻找当前偏移量实现吗?

简而言之,提供的配置是:

-> 处理单个记录而不是批处理。-> Enable Auto Commit Config 提供为 false 并遵循 MANUAL_IMMEDIATE ack 模式。-> 使用并发定义为 1 的 kafkaListenerContainerFactory。

非常感谢对此的建议!

标签: apache-kafka

解决方案


推荐阅读