apache-kafka - Kafka excatly-once 生产者消费者
问题描述
我正在为一个简单的数据管道实现 Exactly-once 语义,使用 Kafka 作为消息代理。我可以通过设置强制 Kafka 生产者将每个生产的记录只写一次 set enable.idempotence=true
。
但是,在消费方面,我需要保证消费者只读取每条记录一次(我对将消费的记录存储到外部系统或另一个正在处理的 Kafka 主题不感兴趣)。为了实现这一点,我必须确保轮询记录得到处理,并且它们的偏移量以__consumer_offsets
原子/事务方式提交给主题(同时成功/失败)。
在这种情况下,我是否需要求助于 Kafka 事务 API 在消费者轮询循环中创建事务生产者,在事务内部我执行:(1)处理消费记录和(2)在关闭事务之前提交它们的偏移量. 在这种情况下,正常的 commitSync/commitAsync 会服务吗?
解决方案
“在消费方面,我需要保证消费者每条记录只读一次”
Gopinath 的回答很好地解释了如何在 KafkaProducer 和 KafkaConsumer 之间实现精确一次。这些配置(连同 KafkaProducer 中 Transaction API 的应用)保证了生产者发送的所有数据都将在 Kafka 中存储一次。但是,它不能保证消费者只读取一次数据。当然,这取决于您的偏移管理。
无论如何,我理解你的问题,你想知道消费者本身是如何处理消费消息的。
为此,您需要以原子方式自行管理偏移量。这意味着,您需要建立自己的“交易”
- 从 Kafka 获取数据,
- 处理数据,以及
- 在外部存储已处理的偏移量。
方法 commitSync 和 commitAsync 不会让您走得太远,因为它们只能确保在消费者中进行最多一次或至少一次处理。此外,您的处理是幂等的。
有一个不错的博客解释了这种使用ConsumerRebalanceListener
和存储本地文件系统中的偏移量的实现。还提供了完整的代码示例。
“我是否需要借助 Kafka 事务 API 在消费者轮询循环中创建事务生产者”
Transaction API 仅适用于 KafkaProducers,据我所知,不能用于您的偏移管理。
推荐阅读
- javascript - `import app from "./app"` 不起作用,但它应该可以工作
- python - 无法在 Ubuntu 19.10 中安装 python3.6 virtualenv(“没有名为 apport 的模块”)
- node.js - Express 验证器自定义问题
- c++ - 为什么我在此代码中收到运行时错误 SIGSEGV
- mongodb - mongodb中的嵌套查找
- python - 通过python向许多用户发送推送通知
- r - 从头开始的闪亮小工具
- python - 如何使用时间索引转发fillna
- c - 在 C 中从 char* 有效负载构造结构
- javascript - 如何为我的组件实现滑动逻辑?