windows - 执行 Spark 流式传输以从 Kafka 主题读取数据时发生错误
问题描述
我是 Kafka 和 Spark 的新手。我已通过 Kafka 生产者传递消息并尝试在火花流中读取,但在 main 方法中出现错误。代码如下。
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 Streaming Example.py
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import time
# In[ ]:
if __name__ == "__main__":
spark = SparkSession.builder.master("local").appName("Kafka Spark Demo").getOrCreate()
sc=spark.sparkContext
ssc=StreamingContext(sc, 20)
message = KafkaUtils.createDirectStream(ssc, topic=['testtopic'], KafkaParams = {"metadata.broker.list": "localhost:9092"})
data = message.map(lambda x: x[1])
def functordd(rdd):
try:
rdd1=rdd.map(lambda x: json.loads(x))
df = spark.read.json(rdd1)
df.show()
sf.createOrReplaceTimeView("Test")
df1=spark.sql("select iss_position.latitude, iss_position.longitude, message, timestamp from Test")
df1.write.format('csv').mode('append').save("testing")
except:
pass
data.foreachRDD(functordd)
sc.stop()
解决方案
你给出的命令是试图找到/执行一个名为的文件Streaming
并给它一个名为的参数Example.py
您需要在任何带有空格的文件周围加上引号,以将它们作为单个参数
而且,org.apache.spark:spark-sql-kafka-0-10 ...
如果您尝试对 Kafka 数据使用 SQL 查询,则应该使用(对于您各自的 Spark 版本;如果您是 Spark 的新手,应该是 Spark 3);流式依赖已被弃用
推荐阅读
- angular - Angular 2:在单独的组件中更改数据时,类绑定不会更新
- c++ - 如何在 linux 上将 pylon 相机链接到 qt creater
- javascript - jquery错误的事件处理程序
- linux - 由于 ALTER SYSTEM 设置错误,无法启动 postgres
- openstreetmap - 离线路由......初学者从哪里开始
- jpa - 卡在将值从 selectcheckboxmenu 传递到 managedbean
- c# - 在循环内创建列表时,由于其保护级别而无法访问
- python-3.x - 通过 id 和 date 使用 pivot/groupby 重塑数据框
- json - 对 Flask Rest API 的 Ajax 调用未获取 JSON
- html - 为什么 position:sticky 不能与 flexbox 一起使用?