首页 > 解决方案 > 避免 Kafka 生产者消息的重复

问题描述

我正在使用KafkaTemplateSpring boot.Java 8

我的主要目标是消费者不应该两次消费消息。

1) 调用一个表获取100行并发送给kafka

2)假设我处理了 70 行(我得到了成功 ack)然后 Kafka 宕机了(Kafka 没有在 RETRY 机制计时内恢复)

因此,当我重新启动 Spring Boot 应用程序时,我如何确保不再发送这 70 条消息。

一种选择是我可以在 DB 表消息中有标志is_sent = Y or N

有没有其他有效的方法?

标签: javaapache-kafkaspring-kafkakafka-producer-api

解决方案


我将使用JDBC 源连接器(取决于您当前使用的数据库)和Kafka Connect来正确处理这种情况。


如果您仍然想编写自己的生产者, Kafka FAQ 的这一部分应该很有用:

如何从 Kafka 获取一次性消息?

Exactly once 语义有两部分:在数据生产过程中避免重复和在数据消费过程中避免重复。

有两种方法可以在数据生产期间获得恰好一次的语义:

  1. 每个分区使用一个写入器,每次遇到网络错误时,检查该分区中的最后一条消息,看看你的最后一次写入是否成功
  2. 在消息中包含主键(UUID 或其他内容)并对使用者进行重复数据删除。

如果你做这些事情之一,Kafka 托管的日志将是无重复的。然而,没有重复的阅读也取决于消费者的一些合作。如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,那么这里也可能会出现重复。此问题特定于您的存储系统。例如,如果您使用的是数据库,则可以在事务中将它们一起提交。LinkedIn 编写的 HDFS 加载器 Camus 为 Hadoop 加载做了类似的事情。不需要事务的另一种替代方法是使用加载的数据存储偏移量,并使用主题/分区/偏移量组合进行重复数据删除。

我认为有两个改进可以使这更容易:

  1. 通过选择性地在服务器上集成对此的支持,生产者幂等性可以自动完成,而且成本更低。
  2. 现有的高级消费者并没有公开很多更细粒度的偏移控制(例如重置您的位置)。我们将尽快解决这个问题

推荐阅读