apache-kafka - Kafka消费者分区在暂停和恢复时重新处理未提交的消息
问题描述
我正在寻找在 kafka 消费者暂停和恢复后重新处理未提交消息的选项。有人可以建议我使用最好的方法吗?选择暂停/恢复而不是停止和重新启动使用者,以避免重新平衡作为应用程序中错误处理的一部分。
明智地重新平衡它可以正常工作,正如我们预期的那样,暂停/恢复容器,但唯一的问题是它不会在恢复后重新处理未提交的消息。有什么方法可以重新处理那些未提交的消息?
停止和重新启动面临的另一个问题是,一旦在错误情况下停止后启动消费者,消费者就能够拾取未提交的消息并重试它,而不会像预期的那样增加偏移量。如果我发布任何其他可以成功处理的消息,则此过程仍在继续,然后在它没有拾取上一个失败/未提交的消息之后,它正在从轮询中的下一个偏移量中查找消息。我们如何处理这种情况?
如果寻找当前偏移量是一种选择,那么有人可以提供任何工作示例,在这种情况下寻找当前偏移量实现吗?
简而言之,提供的配置是:
-> 处理单个记录而不是批处理。-> Enable Auto Commit Config 提供为 false 并遵循 MANUAL_IMMEDIATE ack 模式。-> 使用并发定义为 1 的 kafkaListenerContainerFactory。
非常感谢对此的建议!
解决方案
推荐阅读
- python - Tkinter 命令仅在调用时解析参数
- java - 如何在使用 xPath.evaluate() 时选择通过检查的节点,而不是它们的父节点
- java - 使用 log4j 记录 spring 特定的日志
- python - QsciScintilla 中的 QSyntaxHighlighter 的 setCurrentBlockState 是否有任何等价物?
- cassandra - 将数据从本地 cassandra 集群流式传输到 AWS 时出错(版本:2.1.19)
- java - 为什么在运行欢迎文件列表中出现的 servlet 时 url 不改变
- reactjs - 无论如何在子组件中从父级执行 React JS 钩子?
- android - 实施范围存储后应用程序没有响应
- python - 如何解决 Import "django_filters.views" could not be resolvedPylance 错误
- javascript - 如何使用赛普拉斯获取夹具中文件的完整路径?