首页 > 解决方案 > 使用来自 Confluent Kafka 主题的数据并使用 Python 退出

问题描述

我正在尝试编写一个 python 代码来使用来自 Confluent Kafka 主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消耗过程处于无限循环中,并且如果循环读取了所有消息,则寻找退出的决策点。

请参见下面的示例代码

conf = {'bootstrap.servers': "server:port", #
    'group.id': str(uuid.uuid1),
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',        
    'session.timeout.ms': 6000
    }
consumer = Consumer(conf) 
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
    msg=consumer.poll(timeout=2.0)

    # print(msg)

    if msg is None:
        print('msg is None')
        continue

    if msg.error().code() == KafkaError._PARTITION_EOF:
        print('End of partition reached {0}/{1}'
              .format(msg.topic(), msg.partition()))
        print( msg.error().code())
    else:
        print(msg.value())           

except KeyboardInterrupt:
   print("Interrupted through kb")

finally:
   consumer.close()

请建议什么是决定是否读取所有消息的最佳方法,以便我可以退出循环并关闭消费者。

标签: pythonkafka-consumer-apiconfluent-platform

解决方案


根据定义,Apache Kafka 主题是无限的事件流。流没有“终点”,只有您可以选择定义的人为终点。

您需要在应用程序逻辑中定义它。例如,如果您在<x>几秒钟内没有收到消息,请将其视为“结束”,并停止消费。


推荐阅读