python-3.x - `for .. in KafkaConsumer`:如何在`eof`中生存?
问题描述
for consumed_record in KafkaConsumer(config.kafka_topic,
bootstrap_servers=config.kafka_bootstrap_servers,
group_id=config.kafka_group_id):
__process_kafka_record(consumed_record)
上面的代码工作正常,除非EOF
看到。然后for
循环终止,但这不是我想要的行为。我希望我的消费者无休止地运行,除非整个过程都结束了。从概念上讲,我需要类似if is_eof(): continue
. python-kafka
用包做到这一点的正确方法是什么?
解决方案
看起来您正在使用kafka-python。
使用记录的正确方法是将该循环包装在另一个循环中,该循环仅在您愿意时才停止。
kafka-python 项目包含一些示例,例如:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(['my-topic'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()
推荐阅读
- arrays - 使用惰性 var 并且 Xcode 仍然说不能使用实例,属性初始化程序在“自我”之前运行?
- angular - 即使在尝试了多种方式之后,GitLab 管道也经常失败
- rabbitmq - 如何在 MQ 上捕获错误请求?
- regex - 设置正则表达式的优先级
- angular - 动态设置 scss 变量 angular ionic
- python - 如何使用 XPath 查找标题包含单引号和双引号的元素?
- intellij-idea - IntelliJ - 仅更改项目菜单的文本大小
- azure - 在 Azure 应用服务中更改系统日期?
- python - 重命名 groupby 的列名并使用 Pandas 计算结果
- android - 单击项目后,recyclerview 中的项目数量会自行增加