python - 使用 python 从连接到另一个容器的容器创建一个新的 kafka 主题
问题描述
我想使用 python 创建一个新的 kafka 主题,当我尝试使用以下命令创建 KafkaAdminClient 时出现错误server="kafka:9092"
:
self._kafka_admin = KafkaAdminClient(
bootstrap_servers=[server],
api_version=(0, 10, 2),
api_version_auto_timeout_ms=120000)
我得到的错误:
Traceback (most recent call last):
File "main.py", line 47, in <module>
kafka_manager = KafkaManager("kafka:9092")
File "/app/src/kafka/kafka_manager.py", line 24, in __init__
self._kafka_admin = KafkaAdminClient(
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 211, in __init__
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
此外,我构建了下一个 docker-compose 文件:
version: '3'
services:
spark-master:
image: docker.io/bitnami/spark:2
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- type: bind
source: ./conf/log4j.properties
target: /opt/bitnami/spark/conf/log4j.properties
ports:
- '8080:8080'
- '7077:7077'
networks:
- spark
container_name: spark
spark-worker-1:
image: docker.io/bitnami/spark:2
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://localhost:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- type: bind
source: ./conf/log4j.properties
target: /opt/bitnami/spark/conf/log4j.properties
ports:
- '8081:8081'
container_name: spark-worker
networks:
- spark
depends_on:
- spark-master
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
container_name: zookeeper
networks:
- rmoff_kafka
kafka:
image: confluentinc/cp-kafka:5.5.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
container_name: kafka
networks:
- rmoff_kafka
app:
build:
context: ./
depends_on:
- kafka
ports:
- 5000:5000
container_name: app
networks:
- rmoff_kafka
networks:
spark:
driver: bridge
rmoff_kafka:
name: rmoff_kafka
解决方案
解决方案是将主机名更改为容器的名称(kafka),并在 python 中添加 sleep() 命令以等待 kafka 容器启动。
推荐阅读
- python - 比较两个数据框中的列
- java - MongoDB是否确保在成功保存响应后,数据将立即可搜索
- cmake - 即使 BUILD_SHARED_LIBS 设置为 ON,如何强制 cmake 静态链接特定库
- pandas - 通过长期平均 Pandas 计算每月变异性
- javascript - asp.net C#使用javascript函数从gridview中获取选定的行
- coq - 为什么 Coq 中的所有数字文字都显示 nat 类型?
- php - Laravel 8.X 问题:直接对数据库进行更改,无需迁移
- r - 重命名 X 轴变量以分隔当前月份数据和上个月数据
- typescript - 如何使用 CDK 订阅通知或 SNS 主题的警报?
- python-3.x - 从没有元音的字符串列表中查找字符串