首页 > 解决方案 > 如何在结构化流中将数据帧转换为 rdds?

问题描述

我使用 pyspark 流从 kafka 获取数据,结果是一个数据帧,当我将数据帧转换为 rdd 时,它出错了:

Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

正确的版本代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

这是错误的版本代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

为什么它不能将数据帧转换为rdd?当我想在 pyspark 流中将数据帧转换为 rdd 时,我该怎么做?

标签: apache-sparkspark-streaming

解决方案


如果您的 spark 版本是 2.4.0 及更高版本,那么您可以使用以下替代方法来处理数据帧的每一行。

query=df.writeStream.foreach(Customized method to work on each row of dataframe rather than RDD).outputMode("update").start()
    ssc.start()
    ssc.awaitTermination()

推荐阅读