json - PySpark RDD - 获取排名,转换为 JSON
问题描述
我有一个 Hive 查询,它返回如下数据:
Date,Name,Score1,Score2,Avg_Score
1/1/2018,A,10,20,15
1/1/2018,B,20,20,20
1/1/2018,C,15,10,12.5
1/1/2018,D,11,12,11.5
1/1/2018,E,21,29,25
1/1/2018,F,10,21,15.5
我hive_context.sql(my_query).rdd
用来把它变成一个RDD。我的最终目标是将其转换为基于 Avg_score 降序排列的 JSON 格式,如下所示:
Scores=
[
{
"Date": '1/1/2018',
"Name": 'A',
"Avg_Score": 15,
"Rank":4
},
{
"Date": '1/1/2018',
"Name": 'B',
"Avg_Score": 20,
"Rank":2
}
]
作为获得排名的第一步,我尝试实施这种方法,但我一直遇到错误,例如AttributeError: 'RDD' object has no attribute 'withColumn'
我将如何完成这项工作?
解决方案
这是因为您在 RDD 级别工作。如果要使用 Dataframe API,则必须使用 Dataset(或 Dataframe)。正如对您问题的评论中提到的,您可以删除.rdd
转换并使用asDict
来获得最终结果。
df = sc.parallelize([
("1/1/2018","A",10,20,15.0),
("1/1/2018","B",20,20,20.0),
("1/1/2018","C",15,10,12.5),
("1/1/2018","D",11,12,11.5),
("1/1/2018","E",21,29,25.0),
("1/1/2018","F",10,21,15.5)]).toDF(["Date","Name","Score1","Score2","Avg_Score"])
from pyspark.sql import Window
import pyspark.sql.functions as psf
w = Window.orderBy(psf.desc("Avg_Score"))
rddDict = (df
.withColumn("rank",psf.dense_rank().over(w))
.drop("Score1","Score2")
.rdd
.map(lambda row: row.asDict()))
结果
>>> rddDict.take(1)
[{'Date': u'1/1/2018', 'Avg_Score': 25, 'Name': u'E', 'rank': 1}]
但是请注意使用不带分区的 Window 函数的警告:
18/08/13 11:44:32 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
推荐阅读
- java - 如何根据 Java 中的变量指定 ObjectMapper 中的第二个参数?
- javascript - Instagram 之类的封面选择器用于反应
- javascript - 尝试在 node.js 应用程序的猫鼬模式中向 mongoDb 添加消息历史记录
- post - 为什么谷歌分析不收集转化?
- push-notification - OneSignal external_user_id 和同一用户的多个设备
- c# - dotnet.core Asp.net 异步/等待无处不在?
- javascript - 你好!有谁知道为什么这不起作用?
- django - 通过 Django rest 框架返回模型字段名称
- node.js - 节点 fs:写入两倍于缓冲区长度的二进制文件
- linux - systemd 启动的进程看不到环境变量