python - 在 python 中使用来自不同容器的 Kafka 消息
问题描述
下面是我的 docker-compose 文件:
version: "3"
services:
app:
image: app
restart: always
ports:
- "8001:8081"
depends_on:
- kafka
- zookeeper
- consumer
environment:
- KAFKA_HOST = kafka
zookeeper:
image: "wurstmeister/zookeeper:latest"
ports:
- "2181:2181"
hostname: zookeeper
kafka:
image: "wurstmeister/kafka:2.12-2.2.0"
ports:
- "9092:9092"
hostname: kafka
links:
- zookeeper:zookeeper
environment:
KAFKA_CREATE_TOPICS: "Topic01:2:2" #TOPIC:PARTITON:REPLICATION
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: "60000"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: 'PLAINTEXT://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
consumer:
image: consumer:latest
build:
context: ./consumer
ports:
- "8283:8283"
文本流是使用app
图像容器生成的,并且能够生成消息。
它通过在 kafka 容器内导航来验证:
docker exec -it <CONTAINER ID of Kafka Image> /bin/bash
当我在容器内手动运行以下脚本时
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Topic01 --from-beginning
我可以看到正在生成的文本块。
但是当我尝试从图像的容器中读取相同的数据时consumer
,它给了我空白,
尝试的方法:
docker exec
在消费者容器内,并运行以下 python 代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('Topic01', bootstrap_servers='kafka:9092')
for messages in consumer:
print(messages)
它什么也不打印。
是因为我的 docker-compose 文件错误还是 python 代码中的错误?
解决方案
根据针对听众的 Kafka 文档:
侦听器列表 - 我们将侦听的 URI 的逗号分隔列表和侦听器名称。如果侦听器名称不是安全协议,还必须设置 listener.security.protocol.map。将主机名指定为 0.0.0.0 以绑定到所有接口。将主机名留空以绑定到默认接口。合法监听器列表示例:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
您设置KAFKA_LISTENERS
为PLAINTEXT://:9092
,因此它绑定到默认接口,可能无法从您的 Kafka 容器外部访问(同时它与使用 localhost 的控制台使用者一起工作)。试一试并指定KAFKA_LISTENERS
为PLAINTEXT://0.0.0.0:9092
, 以检查您的消费者是否开始消费消息。
推荐阅读
- node.js - 即使在时间结束后如何停止节点计划重复作业
- json - Flutter:参数类型“String”不能分配给参数类型“Uri”
- assembly - x64 中的这段代码有什么问题?它在 VS2019 中不返回 0
- rust - 将 ty 转换为 macro_rules 中的 ident
- python - 不显示提取的数据
- spring - 如何在 Spring-boot 应用程序中获取 context-path 和 info.info?
- python - 是否可以通过迭代附加到数据框?
- scala - 在 Scala 中导入 Doodle 库
- javascript - 带有 json 转义的 Mustache 渲染
- amazon-web-services - 如何为基于事件的数据构建 DynamoDB 表?