首页 > 解决方案 > Kafka主题中的唯一消息检查

问题描述

我们使用 Logstash,我们想从 Oracle 数据库中读取一张表并将这些消息(如下所示)发送到 Kafka:

Topic1: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"2019-07-30"}
        message4: {"name":"name-4", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}

请注意message1message4same ID number.

现在,我们要确保所有消息都是唯一的,那么我们如何过滤topic1和唯一所有消息然后发送到topic2

我们想要的最终结果:

Topic2: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"

标签: apache-kafkalogstash

解决方案


这被称为一次性处理

您可能对Kafka FAQ的第一部分感兴趣,该部分描述了一些关于如何避免数据生产重复的方法(即从生产者方面):

Exactly once 语义有两部分:在数据生产过程中避免重复和在数据消费过程中避免重复。

有两种方法可以在数据生产期间获得恰好一次的语义:

  1. 每个分区使用一个写入器,每次遇到网络错误时,检查该分区中的最后一条消息,看看你的最后一次写入是否成功
  2. 在消息中包含主键(UUID 或其他内容)并对使用者进行重复数据删除。

如果你做这些事情之一,Kafka 托管的日志将是无重复的。然而,没有重复的阅读也取决于消费者的一些合作。如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,那么这里也可能会出现重复。此问题特定于您的存储系统。例如,如果您使用的是数据库,则可以在事务中将它们一起提交。LinkedIn 编写的 HDFS 加载器 Camus 为 Hadoop 加载做了类似的事情。不需要事务的另一种替代方法是使用加载的数据存储偏移量,并使用主题/分区/偏移量组合进行重复数据删除。

我认为有两个改进可以使这更容易:

  1. 通过选择性地在服务器上集成对此的支持,生产者幂等性可以自动完成,而且成本更低。
  2. 现有的高级消费者并没有公开很多更细粒度的偏移控制(例如重置您的位置)。我们将尽快解决这个问题

另一种选择(这不是您正在寻找的),将是日志压缩。假设您的重复消息具有相同的密钥,当日志压缩策略有效时,日志压缩最终将删除重复消息。


推荐阅读