首页 > 解决方案 > Pyspark RDD 消除值中的无?

问题描述

我是 pyspark RDD 的新手,并且有一个从 JSON 文件中获取的数据框:

行(created_at='2021-05-05 23:37:51', hash_tags=None, id=1390088382659895296, replyto_id=None, replyto_user_id=None, retweet_id=1390027514332991489, retweet_user_id=807095, text='RT @times: Breaking Newsny : 拜登政府将支持取消对 Covid-19 疫苗的专利保护,这是 glob 的一个突破……', user_id=17799542, user_mentions=[Row(id=807095, indices=[3, 11])])

这是我的所有代码:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


data_rdd = spark.read.option("multiline","true")\
    .json("tweets.json")
 
print(data_rdd.collect()[0])
def extractColumns(record):
    return (record[8],[record[4], record[6]])

ddata_frame = data_rdd.rdd.map(extractColumns)\
    .groupByKey()\
    .map(lambda r: (r[0], list(r[1])))

我以如下形式获得了RDD数据:[(17799542, [[None, 807095]]),

...

(3094649957,[[无,3094649957],[无,无],[无,3094649957],[无,无],[无,3094649957],[无,无]])]

如何消除 None 以实现以下值:[(17799542,[807095]),

...

(3094649957, [3094649957, 3094649957, 3094649957])]

我在下面尝试过但没有工作:

def eliminateNone(record):
    s = list(filter(lambda s: each != None for each in s))
    return (record[0], s)

data_frame.mapValues(eliminateNone)
print(data_frame.collect())

我很感激任何帮助。

标签: pysparkapache-spark-sqlrdd

解决方案


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


data_rdd = spark.read.option("multiline","true")\
    .json("tweets.json")


 
print(data_rdd.collect()[0])

def extractColumns(record):
    return (record[8],[record[3], record[5]])

def merge_values(data):
    result = []
    for l in data:
        for x in l:
            if x != None:
                result.append(x)
    return result

data_frame = data_rdd.rdd.map(extractColumns)\
    .groupByKey()\
    .map(lambda r: (r[0], list(r[1]))) 


data_frame = data_frame.mapValues(merge_values)
print(data_frame.collect())

你可以试试这个。


推荐阅读