python - Python Kafka消费者从一开始就没有收到消息?
问题描述
我在我的 Windows PC 上安装了 Kafka。创建了一个主题quickstart-events
并发送了一些消息。使用参数运行控制台消费者--from-beginning
可以接收消息。
.\bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Picked up JAVA_TOOL_OPTIONS: -agentpath:"C:\WINDOWS\system32\Aternity\Java\JavaHookLoader.dll"="C:\ProgramData\Aternity\hooks"
msg1
msg2
msg3
msg4
但是,使用参数运行 Python 代码auto_offset_reset='earliest'
将第一次打印消息。那么,第一次运行后它不会打印任何消息?
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest')
for msg in consumer:
print(msg)
解决方案
TL;博士
每次要从头开始阅读主题时,您都需要提供一个新的 group.id,同时保持设置 auto_offset_reset='earliest':
KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest', group_id='newGroup')
如果您的代码在第一次运行时打印输出但不再在后续运行中打印输出,并且在重新启动 Kafka(您的 PC)时您的问题也得到了解决,那么您正在触及 Kafka 中消费者组的概念。由于这是一个非常重要的概念,我强烈建议您在这里熟悉它。
应用程序的消费者组确保它不会两次读取消息。每个消费者都有一个消费者组名称(即使您可能不会直接在代码中看到)。消费者组的偏移位置存储在内部 Kafka 主题中。
现在重新启动 Kafka 后第一次运行代码,Kafka 还不知道消费者组并应用 auto_offset_reset 配置中提供的策略。在您的情况下,它从最早可用的提交中读取。第二次运行代码时,它不需要查看此策略,因为它已经知道消费者,并且不允许消费者再次使用该消息。
因此,如果重新启动 Kafka,消费者的这种内部知识也会消失,并再次应用 auto_offset_reset 策略。
请记住,这是一种 hack,不应该经常在生产系统上执行,因为 consumerGroups 将处于空闲状态。
作为 sid 说明:您的控制台消费者每次运行时都会创建一个新的消费者组。设置“--from-beginning”只是确保 auto_offset_reset 设置为“最早”。
推荐阅读
- python - 读取 .txt 并使用标题字符串作为列表名称
- tensorflow - seq2seq 模型中的训练和推理解码器之间共享什么?
- android - react-native run-android 正在返回:未能执行 aapt。对于 react-naitve-push-notification
- r - 在不考虑顺序的情况下将一行单元格中的所有元素与另一行单元格中的所有元素进行比较
- javascript - Vue父组件道具没有传递给子组件
- c - 使用 gcc 为 HTTPS(OPENSSL)编译 Socket C 程序时出错
- web-services - 通过安全连接在 PAT 端口上提供 Web 服务
- r - facet_wrap() 对象上的误差线困难
- makefile - shell touch 命令在 Makefile 中不起作用
- git - 在云端生产中实际使用 Spring Cloud Config?