首页 > 解决方案 > 使用 python 从 kafka 读取最新偏移量

问题描述

我正在使用 confluent-kafka Python 库从 kafka 中读取。我正在使用以下消费者设置

Consumer ={
"bootstrap.servers" : kafka_server,
"group_id" : "testing",
"auto.offset.reset" : "latest"}

我的目标是确保我始终阅读 kafka 中的最新消息。只要程序继续运行,上述方法就可以工作。但是,如果程序由于某种原因崩溃,它会从上次使用的消息开始读取,而不是从主题中的最后一条消息开始读取。

我不介意丢失一些消息,但我始终阅读最新消息是绝对必要的。看起来消费者记住了偏移量并从它开始而不是从最新的偏移量开始。

我尝试将enable.auto.commit参数设置为 False,但我得到了相同的结果。

标签: apache-kafkaconfluent-kafka-python

解决方案


enable.auto.commit 应该是 true,如果你想实现这种情况。

由于您有 enable.auto.commit='false',这意味着您的代码(消费者)有责任提交偏移量。如果发生崩溃,它可能无法保证提交偏移量,这会导致您的应用程序从最后一条消费消息开始。

配置“最新”并不意味着消费者会跳过消息并处理最新消息。


推荐阅读