首页 > 解决方案 > 我正在尝试通过 docker compose 运行烧瓶应用程序和 kafka ..但无法正确设置

问题描述

我正在尝试通过 docker compose..以及 kafka 运行烧瓶应用程序。我正在一起运行烧瓶应用程序和 consumer.py。当我通过烧瓶 api 调用 producer.py 时,它似乎正在发送数据,但消费者没有收到任何东西。由于我同时运行烧瓶应用程序和消费者文件..我也无法看到日志...可以帮助一些人..

我的 docker-compose 文件

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: "zoo1"
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: "kafka1"
    ports:
     - "9092:9092"
    expose:
     - "9093"
    depends_on:
     - zookeeper
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  backend:
    build: .
    image: backend:v1
    command: bash -c "python3 run.py run && python run.py consumer"
    container_name: backend
    restart: always
    volumes:
      - .:/app
    ports:
      - '5000:5000'
    depends_on:
      - mongodb
      - neodb
      - elasticsearch
      - kafka

我的制片人.py

from kafka import KafkaProducer
from json import dumps

class Producer:
    def __init__(self):
        pass

    def producer(self, topic,data):

        producer = KafkaProducer(
        value_serializer=lambda m: dumps(m).encode('utf-8'),
        bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
        producer.send(topic, data)

我的消费者.py

from kafka import KafkaConsumer
from json import loads
import os

class Consumer:
    def consumer(self,topic):
        print("hello")
        consumer = KafkaConsumer(
                topic,
                auto_offset_reset='latest',
                enable_auto_commit=True,
                group_id='my-group-1',
                value_deserializer=lambda m: loads(m.decode('utf-8')),
                bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
        while True:
            dirName = "bad"
            if not os.path.exists(dirName):
                os.mkdir(dirName)
            for m in consumer:
                pass

标签: pythondockerflask

解决方案


看来你没有得到任何对象consumer = KafkaCo...

这是您必须确保输入正确的文档。

阿帕奇卡夫卡文档


推荐阅读