首页 > 解决方案 > Pyspark:获取时间窗口之间的行数

问题描述

我有一些这种格式的数据:

user_id | date         | app_opened
123     | 2018-09-01   | 1
123     | 2018-09-01   | 1
123     | 2018-09-01   | 1
234     | 2018-08-23   | 1
234     | 2018-08-23   | 1
234     | 2018-08-21   | 1
234     | 2018-08-10   | 1

我正在尝试获取一天内打开的应用程序数量以及从当天开始在上周打开的应用程序数量。

这是我需要的输出:

user_id | date         | app_opened | app_open_day | app_open_week
123     | 2018-09-01   | 1          | 1            | 1
123     | 2018-09-01   | 1          | 2            | 2
123     | 2018-09-01   | 1          | 3            | 3
234     | 2018-08-23   | 1          | 1            | 1
234     | 2018-08-23   | 1          | 2            | 2
234     | 2018-08-21   | 1          | 1            | 3
234     | 2018-08-10   | 1          | 1            | 1

我在 pyspark 中使用窗口函数来获得所需的输出。我成功地得到了app_open_day计数,但我没有得到正确的app_open_week计数。

这是我的查询:

# For app_open_day
w1 = Window.partitionBy('user_id','date','app_opened').orderBy('date').rowsBetween(Window.unboundedPreceding,0)
df = df.select(col("*"), F.sum('app_opened').over(w1).alias("app_open_day"))

# For app_open_week
days = lambda i: i * 86400
w2 = (Window.partitionBy('user_id','date','app_opened').orderBy(date).rangeBetween(-days(7), 0))
df = df.select(col("*"), F.sum('app_opened').over(w2).alias("app_open_week"))

我没有得到我错的地方。请帮忙。TIA。

标签: pyspark

解决方案


您可以找到 app_open_week 字段的解决方案

>>> import pyspark.sql.functions as F
>>> from pyspark.sql.window import Window
>>> 
>>> df = sc.parallelize([
...     (123,'2018-09-01',1),
...     (123,'2018-09-01',1),
...     (123,'2018-09-01',1),
...     (234,'2018-08-23',1),
...     (234,'2018-08-23',1),
...     (234,'2018-08-21',1),
...     (234,'2018-08-10',1)
...     ]).toDF(['user_id','date','app_opened'])
>>> 
>>> window1 = Window.partitionBy('user_id')
>>> df = df.withColumn('max_date', F.max('date').over(window1))
>>> df = df.withColumn('date_diff', (F.datediff(F.to_date('max_date'),F.to_date('date'))/7).cast('integer'))
>>> 
>>> window2 = Window.partitionBy('user_id','date_diff').orderBy(F.desc('date'))
>>> df = df.withColumn('app_open_week', F.row_number().over(window2)).select('user_id','date','app_opened','app_open_week')
>>> 
>>> df.sort(["user_id", "date"], ascending=[1, 0]).show()
+-------+----------+----------+-------------+                                   
|user_id|      date|app_opened|app_open_week|
+-------+----------+----------+-------------+
|    123|2018-09-01|         1|            1|
|    123|2018-09-01|         1|            2|
|    123|2018-09-01|         1|            3|
|    234|2018-08-23|         1|            1|
|    234|2018-08-23|         1|            2|
|    234|2018-08-21|         1|            3|
|    234|2018-08-10|         1|            1|
+-------+----------+----------+-------------+

推荐阅读