apache-spark - 如何在结构化流中将数据帧转换为 rdds?
问题描述
我使用 pyspark 流从 kafka 获取数据,结果是一个数据帧,当我将数据帧转换为 rdd 时,它出错了:
Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
正确的版本代码:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()
q.awaitTermination()
这是错误的版本代码:
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
q = df.writeStream \
.format("console") \
.trigger(processingTime='30 seconds') \
.start()
q.awaitTermination()
为什么它不能将数据帧转换为rdd?当我想在 pyspark 流中将数据帧转换为 rdd 时,我该怎么做?
解决方案
如果您的 spark 版本是 2.4.0 及更高版本,那么您可以使用以下替代方法来处理数据帧的每一行。
query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode("update").start()
ssc.start()
ssc.awaitTermination()
推荐阅读
- openedge - 进展 4GL:关于 OUTPUT TO 语句
- c# - 在 C# 中创建具有多个终端和可能的无效中间状态的流畅接口
- java - 读取 Json 文件但键没有引号
- java - 如果相同,则将数组中的值相乘
- python - 有没有办法在 Python 上使用 itertools 获得不重叠且详尽的列表的“组合组”?
- angular - 在Angular中动态使用setValidators后如何不丢失初始化的验证器?
- python - Gimp python-fu 如何更改菜单文本(标签)
- linux - 为什么 `ld -L -l...` 不匹配 `ld /path/to/library.so` 的行为?
- javascript - 找不到 gm/convert 二进制文件 Node.js pdf-to-image 转换
- javascript - 使用 Google Drive API Node js 上传文件时,文件名是“无标题”