首页 > 解决方案 > 执行 Spark 流式传输以从 Kafka 主题读取数据时发生错误

问题描述

我是 Kafka 和 Spark 的新手。我已通过 Kafka 生产者传递消息并尝试在火花流中读取,但在 main 方法中出现错误。代码如下。

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 Streaming Example.py

主要方法错误

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json 
import time 


# In[ ]:


if __name__ == "__main__":
    spark = SparkSession.builder.master("local").appName("Kafka Spark Demo").getOrCreate()
    sc=spark.sparkContext
    ssc=StreamingContext(sc, 20)
    message = KafkaUtils.createDirectStream(ssc, topic=['testtopic'], KafkaParams = {"metadata.broker.list": "localhost:9092"})
    data = message.map(lambda x: x[1])
    
    def functordd(rdd):
        try:
            rdd1=rdd.map(lambda x: json.loads(x))
            df = spark.read.json(rdd1)
            df.show()
            sf.createOrReplaceTimeView("Test")
            df1=spark.sql("select iss_position.latitude, iss_position.longitude, message, timestamp from Test")
            
            df1.write.format('csv').mode('append').save("testing")
            
        except:
            pass
        
    data.foreachRDD(functordd)
    sc.stop()

标签: windowsapache-sparkpysparkapache-kafka

解决方案


你给出的命令是试图找到/执行一个名为的文件Streaming并给它一个名为的参数Example.py

您需要在任何带有空格的文件周围加上引号,以将它们作为单个参数

而且,org.apache.spark:spark-sql-kafka-0-10 ...如果您尝试对 Kafka 数据使用 SQL 查询,则应该使用(对于您各自的 Spark 版本;如果您是 Spark 的新手,应该是 Spark 3);流式依赖已被弃用


推荐阅读