docker - KafkaProducer 不读取消息
问题描述
我有一个用于简单流式传输的 dockerized Spark 应用程序。侦听器生成随机数并使用以下代码将它们发送到 Kafka:
producer = KafkaProducer(bootstrap_servers=kafka_brokers, api_version=(0, 10, 1))
while True:
data = //generate a json with a single number
producer.send(topic_name, str.encode(json.dumps(data)))
然后我尝试使用消费者读取这些数据:
consumer = KafkaConsumer(topic_name, bootstrap_servers=['192.168.99.100:9092'])
for message in consumer:
record = json.loads(message.value)
list.append(record['field'])
当我运行代码时,它永远不会超过“消费者中的消息”部分。我在 Kafka 中进行了检查,消息都在那里,但我无法通过 Python 访问它们。
编辑:我正在使用bitnami spark容器和kafka 和 zookeeper 的这个设置。
我只有两个单独的文件,一个用于生产者,一个用于消费者。我运行发送到 Kafka 的生产者文件,然后我火花提交消费者文件,该文件应该只打印收到的数字列表。为此,我只需执行 spark-submit --master spark://spark:7077 consumer.py
解决方案
推荐阅读
- python-3.x - 使用 Python 3 推导遍历列表中 dicts 中的项目
- javascript - 如何在 React 中编写通过 useCallback 优化的 onClick(带参数)?
- python - 如何比较 Python 中的 Fraction 对象?
- php - 如何通过传递字符串的php数组来进行postgresql查询,用双引号欺骗?
- javascript - Cypress 遍历表格并单击从 gherkin 文件传递的匹配文本
- ios - iOS UITests 通过 Xcode 但因 xcodebuild 失败
- c# - 如何退出 C# GraphQL 订阅?
- python - 如何在python中检查另一个列表中的列表
- reactjs - 如何处理循环中的复选框反应原生
- python - 将 CSV 文件迁移到 Django SQL 后端的最佳方法?