apache-kafka - 使用 python 从 kafka 读取最新偏移量
问题描述
我正在使用 confluent-kafka Python 库从 kafka 中读取。我正在使用以下消费者设置
Consumer ={
"bootstrap.servers" : kafka_server,
"group_id" : "testing",
"auto.offset.reset" : "latest"}
我的目标是确保我始终阅读 kafka 中的最新消息。只要程序继续运行,上述方法就可以工作。但是,如果程序由于某种原因崩溃,它会从上次使用的消息开始读取,而不是从主题中的最后一条消息开始读取。
我不介意丢失一些消息,但我始终阅读最新消息是绝对必要的。看起来消费者记住了偏移量并从它开始而不是从最新的偏移量开始。
我尝试将enable.auto.commit
参数设置为 False,但我得到了相同的结果。
解决方案
enable.auto.commit 应该是 true,如果你想实现这种情况。
由于您有 enable.auto.commit='false',这意味着您的代码(消费者)有责任提交偏移量。如果发生崩溃,它可能无法保证提交偏移量,这会导致您的应用程序从最后一条消费消息开始。
配置“最新”并不意味着消费者会跳过消息并处理最新消息。
推荐阅读
- ios - 如何在 BarButtonitem 中检测长按
- sql - 在 SQL Server 中添加两个日期时间列并将结果除以 2
- angular - 如何在 Angular 8 中的组件之间共享数据
- nsis - How to add directory and sub directories in installer package
- sql - 在 postgres 表中计算不为空的 json 键-> 值
- javascript - 关于 vuejs 示例中的 promise 的解释请求
- recursion - F# define search function
- python - 获取特定维度的变量索引值
- php - 如何在 yii2 php 中设置 pdf 密码保护
- java - 如何在 JUnit 5 测试中使用 WireMock 的响应模板