python - 使用来自 Confluent Kafka 主题的数据并使用 Python 退出
问题描述
我正在尝试编写一个 python 代码来使用来自 Confluent Kafka 主题的数据,并作为测试项目的一部分执行数据验证。我能够读取数据,但是消耗过程处于无限循环中,并且如果循环读取了所有消息,则寻找退出的决策点。
请参见下面的示例代码
conf = {'bootstrap.servers': "server:port", #
'group.id': str(uuid.uuid1),
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'session.timeout.ms': 6000
}
consumer = Consumer(conf)
consumer.subscribe([topic], on_assign=on_assign)
try:
while True:
msg=consumer.poll(timeout=2.0)
# print(msg)
if msg is None:
print('msg is None')
continue
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
print( msg.error().code())
else:
print(msg.value())
except KeyboardInterrupt:
print("Interrupted through kb")
finally:
consumer.close()
请建议什么是决定是否读取所有消息的最佳方法,以便我可以退出循环并关闭消费者。
解决方案
根据定义,Apache Kafka 主题是无限的事件流。流没有“终点”,只有您可以选择定义的人为终点。
您需要在应用程序逻辑中定义它。例如,如果您在<x>
几秒钟内没有收到消息,请将其视为“结束”,并停止消费。
推荐阅读
- angular - Angular:为什么 NO_ERRORS_SCHEMA 不适用于属性?
- java - 使用 Flash Player 加载图形时如何使用 selenium webdriver 处理图形和读取图形数据
- django-models - Django-Oscar 覆盖非抽象模型: ShippingEventQuantity
- ios - 如何在 SwiftUI 中更改导航栏标题的文本属性?
- haskell - 编写一个灵活的“字符串提取器”
- java - 如何实现队列处理器
- javascript - 如何修复窗口打开功能
- javascript - 如何在不重新渲染的情况下为函数值创建 React Context
- windows - 如何将 Matlab 的退出状态返回到 Jenkins 控制台输出?
- go - 如何将 go-cross 添加到 Yocto SDK?或者在其他机器上编译 Go for Yocto?