首页 > 解决方案 > Confusion on types of Spark RDDs

问题描述

I am just learning Spark and started with RDDs and now moving on to DataFrames. In my current pyspark project, I am reading an S3 file into an RDD and running some simple transformations on them. Here is the code.

segmentsRDD = sc.textFile(fileLocation). \
    filter(lambda line: line.split(",")[6] in INCLUDE_SITES). \
    filter(lambda line: line.split(",")[2] not in EXCLUDE_MARKETS). \
    filter(lambda line: "null" not in line). \
    map(splitComma). \
    filter(lambda line: line.split(",")[5] == '1')

SplitComma is a function that does some date calculations on the row data and return 10 comma-delimited fields back. Once I get that I run the last filter as shown to only pickup rows where value in field [5] = 1. So far everything is fine.

Next, I would like to convert the segmentsRDD to DF with schema as shown below.

interim_segmentsDF = segmentsRDD.map(lambda x: x.split(",")).toDF("itemid","market","itemkey","start_offset","end_offset","time_shifted","day_shifted","tmsmarketid","caption","itemstarttime")

But I get an error about unable to convert a "pyspark.rdd.PipelinedRDD" to DataFrame. Can you please explain the difference between "pyspark.rdd.PipelinedRDD" and "row RDD"? I am attempting to convert to DF with a schema as shown. What am I missing here?

Thanks

标签: pysparkapache-spark-sqlrdd

解决方案


您必须在代码中添加以下行:

from pyspark.sql import SparkSession
spark = SparkSession(sc)

该方法.toDF()不是rdd的原始方法。如果您查看Spark 源代码,您会发现该方法.toDF()是一个猴子补丁

因此,通过 SparkSession 初始化,您可以调用这个猴子修补方法;换句话说,当您运行时,rdd.toDF()您可以直接.toDF()从 Dataframe API 运行该方法。


推荐阅读