pyspark - PySpark Groupby 和接收特定列
问题描述
如果我有如下数据框;
ProductId StoreId Prediction Index
24524 20 3 19
24524 20 5 20
24524 20 1 21
24524 20 2 22
24524 20 3 23
24524 20 1 24
24524 20 3 25
24524 20 4 26
24524 20 5 27
24524 20 6 28
24524 20 1 29
37654 23 8 9
37654 23 3 10
37654 23 4 11
37654 23 5 12
37654 23 6 13
37654 23 7 14
37654 23 8 15
37654 23 4 16
37654 23 2 17
37654 23 4 18
37654 23 3 19
37654 23 7 20
37654 23 7 21
37654 23 3 22
37654 23 2 23
37654 23 3 24
我想根据每个产品和商店平均最后 7 个索引。
ProductId StoreId Prediction(Average)
24524 20 3.28 #(This average is include Index 23, 24, 25, 26, 27, 28 and 29)
37654 23 4.14 #(This average is include Index 18, 19, 20, 21, 22, 23 and 24)
我应该如何处理 groupby?
df.groupBy(["ProductId","StoreId"]).agg({'Prediction':'avg'}))
你能帮我解决这个问题吗?
解决方案
可以通过 Window 函数来完成:
from pyspark.sql.window import Window
import pyspark.sql.functions as f
# create a Window function
col_list = ['ProductId', 'StoreId']
window = Window.partitionBy([col(x) for x in col_list]).orderBy(df['Index'].desc())
# select last 7 rows per partitions
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 7).drop('rank')
# calculate average
df.groupBy(["ProductId","StoreId"]).agg(f.avg(f.col("Prediction"))).show()
+---------+-------+------------------+
|ProductId|StoreId| avg(Prediction)|
+---------+-------+------------------+
| 37654| 23| 4.142857142857143|
| 24524| 20|3.2857142857142856|
+---------+-------+------------------+
推荐阅读
- angular - Angular 编译错误:NG6001:该类在 NgModule 'AppModule' 的声明中列出,但不是指令、组件或管道
- github - 为什么 Action 无法访问 Secret?
- java - 带有SpEL拆分的Spring @Value:无法处理(233)'é'
- excel - 复制并粘贴到下一个空单元格
- c++ - 检测到堆损坏 Malloc() Free()
- javascript - 如何创建一个在没有 jquery 的情况下触发另一个事件的事件?
- php - 如何在 PHP 中搜索 JSON 记录?
- python - 如何根据连续属性对类的对象进行分组?
- python - 如何计算熊猫中两个数据框之间每个月的相关性?
- javascript - MVC 将元素 id 和输入值传递给控制器操作