python - Python Producer 可以通过 shell 发送,但不能通过 .py
问题描述
我有一个正在运行并经过测试的 Kafka 集群,并且正在尝试使用 Python 脚本向代理发送消息。这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起。
我正在为消费者和生产者使用 kafka-python 库。当我使用 Python3 shell 时,我可以使用 Kafka GUI 工具 2.0.4 看到消息出现在主题中
>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
这有效,并且字节出现在代理主题数据中。
当我将与上面相同的代码放入 python .py 文件并使用 Python3 执行时,它完成,但没有数据发送到 Kafka 代理。也没有显示错误。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
解决方案
如您所见,它返回了一个未来。
Kafka 客户端将批量记录,它们不会立即一次发送一条记录,为了做到这一点,您需要等待或刷新生产者缓冲区,以便它在应用程序退出之前发送。换句话说,交互式终端将生产者数据保存在内存中,在后台运行,反之则丢弃该数据
future = producer.send(...)
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
或者只是放producer.flush()
,如果你不关心元数据或抓住未来。
推荐阅读
- eclipse - 将 Tomcat 与 Eclipse 集成
- java - 一种有效的分位数算法/数据结构,允许样本随着时间的推移而更新?
- heroku - 为什么 Heroku 在一个简单的流式应用程序上崩溃?
- java - 使用带有 Android Box 的 OmniKey 读卡器 - 读取扫描的卡片
- .net-core - Yaml 管道。如何使用控制台应用程序在构建中交换连接字符串
- mysql - MySQL/MariaDB 服务器:仅绑定到 1 个 IP 地址
- javascript - 如何在每次单击按钮后出现警报,并在 5 秒后消失?
- excel - 如何在 Excel 中使用 Selenium 应用 SendKeys Keys.Enter(或 Keys.Return)?
- python - 在 Google Cloud Run 容器中使用 Dockerfile 安装 popplerqt5 时,没有名为“popplerqt5”的模块错误
- c# - 为什么 SonarQube 8.3 不检查 C# 代码?