python - pyspark 使用窗口函数
问题描述
我有一个数据框,其中包含代表用户对特定电影评分的实例的行。每部电影可以由多个用户在多个类别中进行评分。这是我使用 movie_lens 数据创建的结果数据框。
|movie_id|year|categories|
+--------+----+----------+
| 122|1990| Comedy|
| 122|1990| Romance|
| 185|1990| Action|
| 185|1990| Crime|
| 185|1990| Thriller|
| 231|1990| Comedy|
| 292|1990| Action|
| 292|1990| Drama|
| 292|1990| Sci-Fi|
| 292|1990| Thriller|
| 316|1990| Action|
| 316|1990| Adventure|
| 316|1990| Sci-Fi|
| 329|1990| Action|
| 329|1990| Adventure|
| 329|1990| Drama|
.
.
.
movie_id 是电影的唯一 id,year 是用户评价电影的年份,category 是电影的 12 个类别之一。部分文件在这里
我想在每个类别中找到每个十年中评分最高的电影(计算每个类别中每个十年中每部电影的频率)
就像是
+-----------------------------------+
| year | category | movie_id | rank |
+-----------------------------------+
| 1990 | Comedy | 1273 | 1 |
| 1990 | Comedy | 6547 | 2 |
| 1990 | Comedy | 8973 | 3 |
.
.
| 1990 | Comedy | 7483 | 10 |
.
.
| 1990 | Drama | 1273 | 1 |
| 1990 | Drama | 6547 | 2 |
| 1990 | Drama | 8973 | 3 |
.
.
| 1990 | Comedy | 7483 | 10 |
.
.
| 2000 | Comedy | 1273 | 1 |
| 2000 | Comedy | 6547 | 2 |
.
.
for every decade, top 10 movies in each category
我了解需要使用 pyspark 窗口功能。这是我尝试过的
windowSpec = Window.partitionBy(res_agg['year']).orderBy(res_agg['categories'].desc())
final = res_agg.select(res_agg['year'], res_agg['movie_id'], res_agg['categories']).withColumn('rank', func.rank().over(windowSpec))
但它返回如下内容:
+----+--------+------------------+----+
|year|movie_id| categories|rank|
+----+--------+------------------+----+
|2000| 8606|(no genres listed)| 1|
|2000| 1587| Action| 1|
|2000| 1518| Action| 1|
|2000| 2582| Action| 1|
|2000| 5460| Action| 1|
|2000| 27611| Action| 1|
|2000| 48304| Action| 1|
|2000| 54995| Action| 1|
|2000| 4629| Action| 1|
|2000| 26606| Action| 1|
|2000| 56775| Action| 1|
|2000| 62008| Action| 1|
我对 pyspark 很陌生,被困在这里。谁能指导我做错了什么。
解决方案
没错,您需要使用一个窗口,但首先,您需要执行第一次聚合来计算频率。
首先,让我们计算十年。
df_decade = df.withColumn("decade", concat(substring(col("year"), 0, 3), lit("0")))
然后我们按十年、类别和电影ID计算频率:
agg_df = df_decade\
.groupBy("decade", "category", "movie_id")\
.agg(count(col("*")).alias("freq"))
最后,我们定义了一个按十年和类别划分的窗口,并使用 rank 函数选择前 10 个:
w = Window.partitionBy("decade", "category").orderBy(desc("freq"))
top10 = agg_df.withColumn("r", rank().over(w)).where(col("r") <= 10)
推荐阅读
- reactjs - 更新嵌套状态对象中的单个记录,react-redux
- r - 使用 geom_smooth 添加回归线以绘制 R 中的离散 x 轴
- reactjs - 在基于类的组件中初始化反应钩子
- github - 无法在只读编辑器 Codesandbox 中编辑
- c# - 如何让滚动视图只影响内部框架,而不是整个应用程序
- pandas - 如何在 seaborn 散点图中绘制核密度估计
- c# - WPF comboboxitem 前景色改变
- reactjs - 如何将胜利图导出为png?
- google-apps-script - 如何使用 Google App 脚本读取 google 表单中选定的单选按钮值
- python - 网页在 selenium 中加载并到达末尾但不包含 div 内的所有元素