docker - 找不到数据源:kafka(Docker环境)
问题描述
我们目前正面临这个问题,所有显示的“类似问题”都无助于解决我们的问题。我们是 docker 新手,也是 spark 新手。
我们使用以下 Docker Compose 来设置我们的容器:
networks:
spark_net:
volumes:
shared-workspace:
name: "hadoop-distributed-file-system"
driver: local
services:
jupyterlab:
image: jupyterlab
container_name: jupyterlab
ports:
- 8888:8888
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: spark-master
networks:
- spark_net
container_name: spark-master
ports:
- 8080:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: spark-worker
networks:
- spark_net
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8081:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: spark-worker
networks:
- spark_net
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "7575"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ./var/run/docker.sock
我们还创建了两个 pythonm 文件来测试 kafka 流是否有效:
制片人
import json
import time
producer = KafkaProducer(bootstrap_servers = ['twitter-streaming_kafka_1:9093'],
api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(1000):
data = {'number' : e}
producer.send('corona', value=data)
time.sleep(0.5)
消费者:
import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
import json
print('starting consumer')
consumer = KafkaConsumer(
'corona',
bootstrap_servers=['twitter-streaming_kafka_1:9093'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
print('printing messages')
for message in consumer:
message = message.value
print(message)
当我们在 jupyterlab 容器中的不同 CLI 中执行这两个脚本时,它就起作用了。当我们想通过 pyspark 使用以下代码连接到我们的生产者流时,我们会收到上述错误。
import random
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession
spark = Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "corona").load()
我们还在 spark-master CLI 中执行了以下命令:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
堆栈跟踪
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-2-4dba09a73304> in <module>
6
7 spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
----> 8 df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "twitter-streaming_kafka_1:9093").option("subscribe", "corona").load()
/usr/local/lib/python3.7/dist-packages/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
418 return self._df(self._jreader.load(path))
419 else:
--> 420 return self._df(self._jreader.load())
421
422 @since(2.0)
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
132 # Hide where the exception came from that shows a non-Pythonic
133 # JVM exception message.
--> 134 raise_from(converted)
135 else:
136 raise
/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
解决方案
您的 Kafka 容器需要放置在spark_net
网络上,以便 Spark 容器按名称解析它
如果您希望 Jupyter 能够在 Spark 集群上启动作业,则与 Jupyter 相同
此外,您需要添加 Kafka 包
推荐阅读
- html - 容器行对齐问题
- audio - 尽管数组索引在界限内,但仍抛出 ArrayIndexOutOfBoundsException
- javascript - 替换 JavaScript 中 URL 字符串中的特定值
- javascript - 如何过滤掉不是字母、数字或标点符号的字符
- jquery - 向下滑动 2 次时发出警报
- java - 使用 rdf4j 将元组转换为 json
- sql - 将新数据添加到数据库时运行 SQL Server 触发器
- java - 带有 Graphics.draw 的 JComponents
- wordpress - 检测您是否处于 Visual Composer Wordpress 中的“前端编辑器”模式
- python - 在 Google Colab 中打开网络摄像头