首页 > 解决方案 > 如何实现多线程Kafka生产者的插入顺序

问题描述

我有一个带有历史记录表的 Ingres 数据库,它记录插入更新和删除等数据库事件。我有一个生产者,它将是多线程的。该生产者将读取 History 表以查找要选择的表和行,然后将该行添加到 Kafka 主题。现在生产者需要确保以与历史表登录相同的方式将事件添加到 Kafka 主题。因此,消费者按照记录在历史记录表中的相同顺序读取它们并在 Postgrace DB 上执行它。

我可以将这些数据生成到多个生产者中。例子

Producer1 has message 1 to 5
producer2 has message 6 to 10
producer3 has message 11 to 15

但是当我消费时,我会收到有关诸如

messageId 1
messageId 2
messageId 3
messageId 6
messageId 7
messageId 11

等等

我想按以下顺序获取所有消息

messageId 1
messageId 2
messageId 3
messageId 4
messageId 5
messageId 6
messageId 7
messageId 8
messageId 9

等等

注意: - 我有 1 个主题和 1 个分区和 1 个消费者

标签: javaapache-kafka

解决方案


您最多可以通过将消息发送到单个分区来保持消息由生产者创建的顺序。Kafka 分区保证了消息的消费顺序,按照它在分区中创建的顺序。

在您的场景中,消息是由多个生产者生成的,并且它们不同步以按顺序用消息填充分区。因此,不可能像您期望的那样在消费者端实现订单。


推荐阅读