java - 避免 Kafka 生产者消息的重复
问题描述
我正在使用KafkaTemplate
Spring boot.Java 8
我的主要目标是消费者不应该两次消费消息。
1) 调用一个表获取100行并发送给kafka
2)假设我处理了 70 行(我得到了成功 ack)然后 Kafka 宕机了(Kafka 没有在 RETRY 机制计时内恢复)
因此,当我重新启动 Spring Boot 应用程序时,我如何确保不再发送这 70 条消息。
一种选择是我可以在 DB 表消息中有标志is_sent = Y or N
。
有没有其他有效的方法?
解决方案
我将使用JDBC 源连接器(取决于您当前使用的数据库)和Kafka Connect来正确处理这种情况。
如果您仍然想编写自己的生产者, Kafka FAQ 的这一部分应该很有用:
如何从 Kafka 获取一次性消息?
Exactly once 语义有两部分:在数据生产过程中避免重复和在数据消费过程中避免重复。
有两种方法可以在数据生产期间获得恰好一次的语义:
- 每个分区使用一个写入器,每次遇到网络错误时,检查该分区中的最后一条消息,看看你的最后一次写入是否成功
- 在消息中包含主键(UUID 或其他内容)并对使用者进行重复数据删除。
如果你做这些事情之一,Kafka 托管的日志将是无重复的。然而,没有重复的阅读也取决于消费者的一些合作。如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,那么这里也可能会出现重复。此问题特定于您的存储系统。例如,如果您使用的是数据库,则可以在事务中将它们一起提交。LinkedIn 编写的 HDFS 加载器 Camus 为 Hadoop 加载做了类似的事情。不需要事务的另一种替代方法是使用加载的数据存储偏移量,并使用主题/分区/偏移量组合进行重复数据删除。
我认为有两个改进可以使这更容易:
- 通过选择性地在服务器上集成对此的支持,生产者幂等性可以自动完成,而且成本更低。
- 现有的高级消费者并没有公开很多更细粒度的偏移控制(例如重置您的位置)。我们将尽快解决这个问题
推荐阅读
- mysql - 用sqlalchemy同时执行多条sql语句
- javascript - 如何从表数据中访问选中的行
- google-apps-script - 如何使用嵌入在 Google 协作平台中的网络应用链接到另一个页面?
- regex - 使用 ANTLR 编写描述以下 Pascal 标记的正则表达式
- d3.js - 如何在 Chrome 中设置 `textPath` 的 `side: right`?
- nginx - 使用 Lets Encrypt SSL、HTTP 2 和重定向的 NGINX 服务器配置
- java - 从 Java 执行任何查询时如何获取数据库消息?
- javascript - 将数组文本框添加到 jquery 并传递给 ajax 并添加到数据库
- python - 单独保存模板匹配 OpenCV Python
- sql - 选择中的虚拟列,这反过来用 SQL Server 中的另一个选择填充此列