首页 > 解决方案 > Kafka excatly-once 生产者消费者

问题描述

我正在为一个简单的数据管道实现 Exactly-once 语义,使用 Kafka 作为消息代理。我可以通过设置强制 Kafka 生产者将每个生产的记录只写一次 set enable.idempotence=true

但是,在消费方面,我需要保证消费者只读取每条记录一次(我对将消费的记录存储到外部系统或另一个正在处理的 Kafka 主题不感兴趣)。为了实现这一点,我必须确保轮询记录得到处理,并且它们的偏移量以__consumer_offsets原子/事务方式提交给主题(同时成功/失败)。

在这种情况下,我是否需要求助于 Kafka 事务 API 在消费者轮询循环中创建事务生产者,在事务内部我执行:(1)处理消费记录和(2)在关闭事务之前提交它们的偏移量. 在这种情况下,正常的 commitSync/commitAsync 会服务吗?

标签: apache-kafkakafka-consumer-apikafka-producer-api

解决方案


“在消费方面,我需要保证消费者每条记录只读一次”

Gopinath 的回答很好地解释了如何在 KafkaProducer 和 KafkaConsumer 之间实现精确一次。这些配置(连同 KafkaProducer 中 Transaction API 的应用)保证了生产者发送的所有数据都将在 Kafka 中存储一次。但是,它不能保证消费者只读取一次数据。当然,这取决于您的偏移管理。

无论如何,我理解你的问题,你想知道消费者本身是如何处理消费消息的。

为此,您需要以原子方式自行管理偏移量。这意味着,您需要建立自己的“交易”

  • 从 Kafka 获取数据,
  • 处理数据,以及
  • 在外部存储已处理的偏移量。

方法 commitSync 和 commitAsync 不会让您走得太远,因为它们只能确保在消费者中进行最多一次或至少一次处理。此外,您的处理是幂等的。

有一个不错的博客解释了这种使用ConsumerRebalanceListener和存储本地文件系统中的偏移量的实现。还提供了完整的代码示例。

“我是否需要借助 Kafka 事务 API 在消费者轮询循环中创建事务生产者”

Transaction API 仅适用于 KafkaProducers,据我所知,不能用于您的偏移管理。


推荐阅读