python - 按一列对 Spark df 进行分组并将一列的结果拆分为多列 - 透视和选择性合并
问题描述
我有以下火花df
请注意,只有当您已经通过运行以下命令安装了 spark 时,您才可以在本地运行它。否则,将问题复制到 Databricks 集群上,该集群将自动初始化 spark 上下文。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
spark_dataframe = pd.DataFrame({'id' : ['867', '430', '658', '157', '521', '867', '430', '867'],
'Probability':[0.12, 0.72, 0.32, 0.83, 0.12, 0.49, 0.14, 0.12],
'RAG': ['G', 'R', 'A', 'R', 'G', 'A', 'G', 'G'],
'Timestamp': ['2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 16-45-32', '2020-07-01 16-45-32', '2020-07-01 15-45-32']})
spark_dataframe = spark.createDataFrame(spark_dataframe)
现在我想按“id”对这个 spark 数据框进行分组,并计算“RAG”列的值,将它们分成不同的列。所以得到这样的东西,
+---+--------------------+-------------+------------+
| id||G(count)|A(count)|R(count)|Timestamp(max) |
+---+--------------------+-------------+------------+
|867| 2| 1| 0|2020-07-01 17-49-32|
|430| 1| 0| 1|2020-07-01 17-49-32|
|658| 0| 1| 0|2020-07-01 17-49-32|
|157| 0| 0| 1|2020-07-01 17-49-32|
|521| 1| 0| 0|2020-07-01 17-49-32|
+---+--------------------+-------------+------------+
基于上面的 Spark 数据框,创建一个字典列表,例如:
final_list=[]
map_dictionary={"R":0.6, "A":0.3, "G":0.1}
final_list=[{"id": "867", "RAG": "G", "Timestamp": "2020-07-01 17-49-32"}, #because for the id 867 the G column had 2 counts greater than the rest A, R column values on the same row.
{"id": "430", "RAG": "R", "Timestamp": "2020-07-01 17-49-32"} #because G and R had 1 occurrence but R has greater weight based on the map dictionary,...
] #length of the list is equal to 5 since five are the unique rows of the spark df above.
解决方案
您可以对它们进行分组和旋转。
import pyspark.sql.functions as F
#%%
tst = sqlContext.createDataFrame([(867,0.12,'G','2020-07-01 17-49-32'),(430,0.72,'R','2020-07-01 17-49-32'),(658,0.32,'A','2020-07-01 17-49-32'),\
(157,0.83,'R','2020-07-01 17-49-32'),(521,0.12,'G','2020-07-01 17-49-32'),(867,0.49,'A','2020-07-01 16-45-32'),
(430,0.14,'G','2020-07-01 16-45-32'),(867,0.12,'G','2020-07-01 16-45-32')],
schema=['id','Probability','RAG','Timestamp'])
tst1 = tst.groupby('id').pivot('RAG').agg(F.count('Probability').alias('count'),F.max('Timestamp').alias('time_stamp'))
# there will be one time stamp per value of 'RAG'. The below code will find maximum among them
ts_coln = [F.col(x) for x in tst1.columns if 'time_stamp' in x]
tst2 = tst1.withColumn('max_ts',F.greatest(*ts_coln))
结果:
+---+-------+-------------------+--------+--------- ----------+--------+------------
-------+-------------------+
| id|A_count| A_time_stamp|G_count| G_time_stamp|R_count| R_time_stamp| max_ts|
+---+-------+-------------------+-------+-------------------+-------+-------------------+-------------------+
|658| 1|2020-07-01 17-49-32| 0| null| 0| null|2020-07-01 17-49-32|
|430| 0| null| 1|2020-07-01 16-45-32| 1|2020-07-01 17-49-32|2020-07-01 17-49-32|
|521| 0| null| 1|2020-07-01 17-49-32| 0| null|2020-07-01 17-49-32|
|157| 0| null| 0| null| 1|2020-07-01 17-49-32|2020-07-01 17-49-32|
|867| 1|2020-07-01 16-45-32| 2|2020-07-01 17-49-32| 0| null|2020-07-01 17-49-32|
+---+-------+-------------------+-------+-------------------+-------+-------------------+-------------------+
最后,您可以删除不相关的列
推荐阅读
- python - 下载的谷歌热图 png 不会加载到 Jupyter Notebook 中的自述文件中
- python - 将变量传递给 String vale,这是 Python 中的 JSON
- javascript - javascript selenium automaion 找不到这样的元素,无法找到元素
- sqlite - 使用包含 LIKE 运算符的逻辑表达式过滤特定列时出现意外输出
- eclipse - 如何获取从 Eclipse 项目的构建中排除的文件的名称?
- acumatica - 添加过滤器后,准备付款页面未显示 PayDate
- logstash - 如何解析值中的json对象
- hyperledger-fabric - 是否可以计算存储在超级账本结构区块链中的记录,我不想从 stateb 中获取数据
- android - FirebaseOffline 无法转换为 android.app.Activity
- geojson - 从 geoJson 源设置标记可见性