首页 > 解决方案 > PySpark RDD - 获取排名,转换为 JSON

问题描述

我有一个 Hive 查询,它返回如下数据:

Date,Name,Score1,Score2,Avg_Score
1/1/2018,A,10,20,15
1/1/2018,B,20,20,20
1/1/2018,C,15,10,12.5
1/1/2018,D,11,12,11.5
1/1/2018,E,21,29,25
1/1/2018,F,10,21,15.5

hive_context.sql(my_query).rdd用来把它变成一个RDD。我的最终目标是将其转换为基于 Avg_score 降序排列的 JSON 格式,如下所示:

Scores=
[
    {
        "Date": '1/1/2018',
        "Name": 'A',
        "Avg_Score": 15,
        "Rank":4
    },
    {
        "Date": '1/1/2018',
        "Name": 'B',
        "Avg_Score": 20,
        "Rank":2
    }
]

作为获得排名的第一步,我尝试实施这种方法,但我一直遇到错误,例如AttributeError: 'RDD' object has no attribute 'withColumn'

我将如何完成这项工作?

标签: jsonapache-sparkpysparkapache-spark-sql

解决方案


这是因为您在 RDD 级别工作。如果要使用 Dataframe API,则必须使用 Dataset(或 Dataframe)。正如对您问题的评论中提到的,您可以删除.rdd转换并使用asDict来获得最终结果。

df = sc.parallelize([
  ("1/1/2018","A",10,20,15.0),
  ("1/1/2018","B",20,20,20.0),
  ("1/1/2018","C",15,10,12.5),
  ("1/1/2018","D",11,12,11.5),
  ("1/1/2018","E",21,29,25.0),
  ("1/1/2018","F",10,21,15.5)]).toDF(["Date","Name","Score1","Score2","Avg_Score"])

from pyspark.sql import Window
import pyspark.sql.functions as psf

w = Window.orderBy(psf.desc("Avg_Score"))

rddDict = (df
  .withColumn("rank",psf.dense_rank().over(w))
  .drop("Score1","Score2")
  .rdd
  .map(lambda row: row.asDict()))

结果

>>> rddDict.take(1)
[{'Date': u'1/1/2018', 'Avg_Score': 25, 'Name': u'E', 'rank': 1}]

但是请注意使用不带分区的 Window 函数的警告:

18/08/13 11:44:32 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

推荐阅读