首页 > 解决方案 > `for .. in KafkaConsumer`:如何在`eof`中生存?

问题描述

for consumed_record in KafkaConsumer(config.kafka_topic,                                      
                                     bootstrap_servers=config.kafka_bootstrap_servers,
                                     group_id=config.kafka_group_id):
        __process_kafka_record(consumed_record)

上面的代码工作正常,除非EOF看到。然后for循环终止,但这不是我想要的行为。我希望我的消费者无休止地运行,除非整个过程都结束了。从概念上讲,我需要类似if is_eof(): continue. python-kafka用包做到这一点的正确方法是什么?

标签: python-3.xapache-kafkakafka-consumer-apikafka-python

解决方案


看起来您正在使用kafka-python

使用记录的正确方法是将该循环包装在另一个循环中,该循环仅在您愿意时才停止。

kafka-python 项目包含一些示例,例如:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(['my-topic'])

while not self.stop_event.is_set():
    for message in consumer:
        print(message)
        if self.stop_event.is_set():
            break

consumer.close()

推荐阅读