pyspark - Pyspark:获取时间窗口之间的行数
问题描述
我有一些这种格式的数据:
user_id | date | app_opened
123 | 2018-09-01 | 1
123 | 2018-09-01 | 1
123 | 2018-09-01 | 1
234 | 2018-08-23 | 1
234 | 2018-08-23 | 1
234 | 2018-08-21 | 1
234 | 2018-08-10 | 1
我正在尝试获取一天内打开的应用程序数量以及从当天开始在上周打开的应用程序数量。
这是我需要的输出:
user_id | date | app_opened | app_open_day | app_open_week
123 | 2018-09-01 | 1 | 1 | 1
123 | 2018-09-01 | 1 | 2 | 2
123 | 2018-09-01 | 1 | 3 | 3
234 | 2018-08-23 | 1 | 1 | 1
234 | 2018-08-23 | 1 | 2 | 2
234 | 2018-08-21 | 1 | 1 | 3
234 | 2018-08-10 | 1 | 1 | 1
我在 pyspark 中使用窗口函数来获得所需的输出。我成功地得到了app_open_day
计数,但我没有得到正确的app_open_week
计数。
这是我的查询:
# For app_open_day
w1 = Window.partitionBy('user_id','date','app_opened').orderBy('date').rowsBetween(Window.unboundedPreceding,0)
df = df.select(col("*"), F.sum('app_opened').over(w1).alias("app_open_day"))
# For app_open_week
days = lambda i: i * 86400
w2 = (Window.partitionBy('user_id','date','app_opened').orderBy(date).rangeBetween(-days(7), 0))
df = df.select(col("*"), F.sum('app_opened').over(w2).alias("app_open_week"))
我没有得到我错的地方。请帮忙。TIA。
解决方案
您可以找到 app_open_week 字段的解决方案
>>> import pyspark.sql.functions as F
>>> from pyspark.sql.window import Window
>>>
>>> df = sc.parallelize([
... (123,'2018-09-01',1),
... (123,'2018-09-01',1),
... (123,'2018-09-01',1),
... (234,'2018-08-23',1),
... (234,'2018-08-23',1),
... (234,'2018-08-21',1),
... (234,'2018-08-10',1)
... ]).toDF(['user_id','date','app_opened'])
>>>
>>> window1 = Window.partitionBy('user_id')
>>> df = df.withColumn('max_date', F.max('date').over(window1))
>>> df = df.withColumn('date_diff', (F.datediff(F.to_date('max_date'),F.to_date('date'))/7).cast('integer'))
>>>
>>> window2 = Window.partitionBy('user_id','date_diff').orderBy(F.desc('date'))
>>> df = df.withColumn('app_open_week', F.row_number().over(window2)).select('user_id','date','app_opened','app_open_week')
>>>
>>> df.sort(["user_id", "date"], ascending=[1, 0]).show()
+-------+----------+----------+-------------+
|user_id| date|app_opened|app_open_week|
+-------+----------+----------+-------------+
| 123|2018-09-01| 1| 1|
| 123|2018-09-01| 1| 2|
| 123|2018-09-01| 1| 3|
| 234|2018-08-23| 1| 1|
| 234|2018-08-23| 1| 2|
| 234|2018-08-21| 1| 3|
| 234|2018-08-10| 1| 1|
+-------+----------+----------+-------------+
推荐阅读
- opencl - nvcc with OpenCL 3.0 fails to partition the GPU
- json - How to pass class as parameter to Jsoniter-Scala codec maker?
- wordpress - Wordpress ajax call returns entire html page, but not always
- go - array of struct object not getting return in response
- .net - Publish WebApi to Azure when update to .Net 5.0 from .Net Core 3.1
- java - How to set fixed headers to the feign client instead of setting on request level
- sql - 如何选择所有未来的关闭时间?
- python - 使用影响其他配置参数的一个可自定义参数进行配置
- node.js - RXJS 加密大文件并发送到 AWS S3
- r - 如何使用 R 中的“huxtable”库为表格中所需的单元格着色。哪种方式更优雅?