首页 > 解决方案 > KafkaProducer 不读取消息

问题描述

我有一个用于简单流式传输的 dockerized Spark 应用程序。侦听器生成随机数并使用以下代码将它们发送到 Kafka:

producer = KafkaProducer(bootstrap_servers=kafka_brokers, api_version=(0, 10, 1))
while True:
    data = //generate a json with a single number
    producer.send(topic_name, str.encode(json.dumps(data)))

然后我尝试使用消费者读取这些数据:

consumer = KafkaConsumer(topic_name, bootstrap_servers=['192.168.99.100:9092'])
for message in consumer:
    record = json.loads(message.value)
    list.append(record['field'])

当我运行代码时,它永远不会超过“消费者中的消息”部分。我在 Kafka 中进行了检查,消息都在那里,但我无法通过 Python 访问它们。

编辑:我正在使用bitnami spark容器和kafka 和 zookeeper 的这个设置。

我只有两个单独的文件,一个用于生产者,一个用于消费者。我运行发送到 Kafka 的生产者文件,然后我火花提交消费者文件,该文件应该只打印收到的数字列表。为此,我只需执行 spark-submit --master spark://spark:7077 consumer.py

标签: dockerapache-kafka

解决方案


推荐阅读