首页 > 解决方案 > 找不到数据源:kafka(Docker环境)

问题描述

我们目前正面临这个问题,所有显示的“类似问题”都无助于解决我们的问题。我们是 docker 新手,也是 spark 新手。

我们使用以下 Docker Compose 来设置我们的容器:


networks:
  spark_net:

volumes:
  shared-workspace:
    name: "hadoop-distributed-file-system"
    driver: local
services:
  jupyterlab:
    image: jupyterlab
    container_name: jupyterlab
    ports:
      - 8888:8888
    volumes:
      - shared-workspace:/opt/workspace
  spark-master:
    image: spark-master
    networks:
      - spark_net
    container_name: spark-master
    ports:
      - 8080:8080
      - 7077:7077
    volumes:
      - shared-workspace:/opt/workspace
  spark-worker-1:
    image: spark-worker
    networks:
      - spark_net
    container_name: spark-worker-1
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8081:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
  spark-worker-2:
    image: spark-worker
    networks:
      - spark_net
    container_name: spark-worker-2
    environment:
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=512m
    ports:
      - 8082:8081
    volumes:
      - shared-workspace:/opt/workspace
    depends_on:
      - spark-master
      
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
      
  kafka:
    image: wurstmeister/kafka
    ports:
      - "7575"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - ./var/run/docker.sock

我们还创建了两个 pythonm 文件来测试 kafka 流是否有效:

制片人

import json
import time

producer = KafkaProducer(bootstrap_servers = ['twitter-streaming_kafka_1:9093'],
                         api_version=(0,11,5),
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(1000):
    data = {'number' : e}
    producer.send('corona', value=data)
    time.sleep(0.5)

消费者:

import time
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime
import json

print('starting consumer')
consumer = KafkaConsumer(
    'corona',
     bootstrap_servers=['twitter-streaming_kafka_1:9093'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

print('printing messages')
for message in consumer:
    message = message.value
    print(message)

当我们在 jupyterlab 容器中的不同 CLI 中执行这两个脚本时,它就起作用了。当我们想通过 pyspark 使用以下代码连接到我们的生产者流时,我们会收到上述错误。

import random
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession
        
spark = Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9093").option("subscribe", "corona").load()

我们还在 spark-master CLI 中执行了以下命令:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...

堆栈跟踪

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-2-4dba09a73304> in <module>
      6 
      7 spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
----> 8 df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "twitter-streaming_kafka_1:9093").option("subscribe", "corona").load()

/usr/local/lib/python3.7/dist-packages/pyspark/sql/streaming.py in load(self, path, format, schema, **options)
    418             return self._df(self._jreader.load(path))
    419         else:
--> 420             return self._df(self._jreader.load())
    421 
    422     @since(2.0)

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

标签: dockerapache-sparkpysparkapache-kafka

解决方案


您的 Kafka 容器需要放置在spark_net网络上,以便 Spark 容器按名称解析它

如果您希望 Jupyter 能够在 Spark 集群上启动作业,则与 Jupyter 相同

此外,您需要添加 Kafka 包


推荐阅读