首页 > 解决方案 > 使用 python 从连接到另一个容器的容器创建一个新的 kafka 主题

问题描述

我想使用 python 创建一个新的 kafka 主题,当我尝试使用以下命令创建 KafkaAdminClient 时出现错误server="kafka:9092"

self._kafka_admin = KafkaAdminClient(
                         bootstrap_servers=[server],
                         api_version=(0, 10, 2),
                         api_version_auto_timeout_ms=120000)

我得到的错误:

Traceback (most recent call last):
  File "main.py", line 47, in <module>
    kafka_manager = KafkaManager("kafka:9092") 
  File "/app/src/kafka/kafka_manager.py", line 24, in __init__
    self._kafka_admin = KafkaAdminClient(
  File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 211, in __init__
    self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
  File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

此外,我构建了下一个 docker-compose 文件:

version: '3'
services:
  spark-master:
    image: docker.io/bitnami/spark:2
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - type: bind
        source: ./conf/log4j.properties
        target: /opt/bitnami/spark/conf/log4j.properties
    ports:
      - '8080:8080'
      - '7077:7077'
    networks:
      - spark
    container_name: spark
  spark-worker-1:
    image: docker.io/bitnami/spark:2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://localhost:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - type: bind
        source: ./conf/log4j.properties
        target: /opt/bitnami/spark/conf/log4j.properties
    ports:
      - '8081:8081'
    container_name: spark-worker
    networks:
      - spark
    depends_on:
      - spark-master
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    container_name: zookeeper
    networks: 
      - rmoff_kafka
  kafka:
    image: confluentinc/cp-kafka:5.5.0
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    container_name: kafka
    networks: 
      - rmoff_kafka
  app:
    build:
      context: ./
    depends_on: 
      - kafka
    ports:
      - 5000:5000
    container_name: app
    networks: 
      - rmoff_kafka

networks:
  spark:
    driver: bridge
  rmoff_kafka:
    name: rmoff_kafka

最后,结构是这样的,2个容器(1个用于python app,另外1个用于kafka): 在此处输入图像描述

以及docker ps查看容器详细信息的结果: 在此处输入图像描述

标签: pythondockerapache-kafkatcpkafka-python

解决方案


解决方案是将主机名更改为容器的名称(kafka),并在 python 中添加 sleep() 命令以等待 kafka 容器启动。


推荐阅读