apache-kafka - Kafka 客户端 poll() 每次收到消息后都会抛出 EOFError
问题描述
我正在使用我在Confluent github中找到的示例客户端轮询代码的一个轻微变体:
c = Consumer({'bootstrap.servers':'localhost:9092','group.id':'devops','auto.offset.reset':'earliest'})
c.subscribe(['system-diskio-write-bytes','system-cpu-user-pct'])
try:
while True:
msg = c.poll(timeout=1000.0)
if msg is None:
continue
if msg.error():
print(msg.error())
else:
print('topic: %s key: %s value: %s' % (msg.topic(), msg.key(), msg.value()))
except KeyboardInterrupt:
print('Polling interrupted by consumer')
在收到每条消息后都会引发 EOF KafkaError:
topic: system-diskio-write-bytes key: None value: b'{"route" : "system-diskio-write-bytes", "timestamp" : 2019-03-06T13:46:25.244, "value" : 655002980352.0}'
KafkaError{code=_PARTITION_EOF,val=-191,str="Broker: No more messages"}
我不明白为什么会发生这种情况 - 关于为什么抛出这个错误的任何想法如何解决?任何想法都非常感谢 - 谢谢!
解决方案
好的,这里详细解答。解释如下:
无论您的应用程序是否在 10 秒内没有调用 poll(),每次消费者遇到新的结束偏移量时,都会将 EOF 事件推送到内部消息队列(由 poll() 提供服务)。
可以按如下方式禁用 EOF 事件:
enable.partition.eof=false
推荐阅读
- console - 是否可以在 OpenSuse 中以编程方式设置 .app 文件的“可执行”属性?
- netlogo - 使汽车以正确的速度行驶(NetLogo)
- html - 我的 django 项目没有显示正文背景
- vim - 如何在本地 vimrc (.lvimrc) 文件中设置 clang 格式选项?
- php - PHPExcel我只能在单元格中加粗某些单词吗
- credential-providers - 我想显示另一个屏幕以在凭据提供程序中请求 OTP
- solr - 从 solr api 和 solr 仪表板中找到不同的 num,用于相同的 solr 查询
- javascript - 对象扩展运算符作为函数参数
- javascript - 承诺中的 AJAX 请求即使在保护子句之后也会执行
- selenium - 带有 Tor 浏览器的 Python Selenium (Ubuntu)