python - 我正在尝试通过 docker compose 运行烧瓶应用程序和 kafka ..但无法正确设置
问题描述
我正在尝试通过 docker compose..以及 kafka 运行烧瓶应用程序。我正在一起运行烧瓶应用程序和 consumer.py。当我通过烧瓶 api 调用 producer.py 时,它似乎正在发送数据,但消费者没有收到任何东西。由于我同时运行烧瓶应用程序和消费者文件..我也无法看到日志...可以帮助一些人..
我的 docker-compose 文件
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: "zoo1"
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: "kafka1"
ports:
- "9092:9092"
expose:
- "9093"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
backend:
build: .
image: backend:v1
command: bash -c "python3 run.py run && python run.py consumer"
container_name: backend
restart: always
volumes:
- .:/app
ports:
- '5000:5000'
depends_on:
- mongodb
- neodb
- elasticsearch
- kafka
我的制片人.py
from kafka import KafkaProducer
from json import dumps
class Producer:
def __init__(self):
pass
def producer(self, topic,data):
producer = KafkaProducer(
value_serializer=lambda m: dumps(m).encode('utf-8'),
bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
producer.send(topic, data)
我的消费者.py
from kafka import KafkaConsumer
from json import loads
import os
class Consumer:
def consumer(self,topic):
print("hello")
consumer = KafkaConsumer(
topic,
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='my-group-1',
value_deserializer=lambda m: loads(m.decode('utf-8')),
bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
while True:
dirName = "bad"
if not os.path.exists(dirName):
os.mkdir(dirName)
for m in consumer:
pass
解决方案
推荐阅读
- flutter - Flutter - 更新 CustomScrollView 的滚动控制器
- c# - 通过 MSIX 包安装的应用程序“忘记”虚拟 LocalAppData 路径
- javascript - 隐藏没有价值的图表线
- c++ - SharedDtor() google protobuf (GetArenaNoVirtual() == nullptr) 崩溃
- amazon-web-services - cloud-init-output.log 在 EC2 实例上被覆盖
- optaplanner - 这是 Optaplanner 的智能用例吗?
- arrays - 如何检查两个 NumPy 数组是否近似相等?
- powerbi - PowerBI 表:如何将数字添加到列名
- python - jupyter notebook 不会运行 str()
- reactjs - Gutenberg Block -> 媒体上传 - 不同图像的数组