python - Python PySpark:每周计算行数,从周一开始,到周日结束
问题描述
我有一个包含以下列的数据框:
ID Scheduled Date
241 10/9/2018
423 9/25/2018
126 9/30/2018
123 8/13/2018
132 8/16/2018
143 10/6/2018
我想按周计算 ID 总数。具体来说,我希望一周总是从星期一开始,总是在星期日结束。
我已经在 Jupyter Notebook 中实现了这一点:
weekly_count_output = df.resample('W-Mon', on='Scheduled Date', label='left', closed='left').sum().query('count_row > 0')
weekly_count_output = weekly_count_output.reset_index()
weekly_count_output = weekly_count_output[['Scheduled Date', 'count_row']]
weekly_count_output = weekly_count_output.rename(columns = {'count_row': 'Total Count'})
但我不知道如何用 Python PySpark 语法编写上述代码。我希望我的结果输出如下所示:
Scheduled Date Total Count
8/13/2018 2
9/24/2018 2
10/1/2018 1
10/8/2018 1
请注意,预定日期始终是星期一(表示一周的开始),总计数从该周的星期一到星期日。
解决方案
感谢Get Last Monday in Spark定义了函数previous_day。
首先导入,
from pyspark.sql.functions import *
from datetime import datetime
假设您的输入数据在我的 df (DataFrame) 中
cols = ['id', 'scheduled_date']
vals = [
(241, '10/09/2018'),
(423, '09/25/2018'),
(126, '09/30/2018'),
(123, '08/13/2018'),
(132, '08/16/2018'),
(143, '10/06/2018')
]
df = spark.createDataFrame(vals, cols)
这是定义的函数
def previous_day(date, dayOfWeek):
return date_sub(next_day(date, 'monday'), 7)
# Converting the string column to timestamp.
df = df.withColumn('scheduled_date', date_format(unix_timestamp('scheduled_date', 'MM/dd/yyy') \
.cast('timestamp'), 'yyyy-MM-dd'))
df.show()
+---+--------------+
| id|scheduled_date|
+---+--------------+
|241| 2018-10-09|
|423| 2018-09-25|
|126| 2018-09-30|
|123| 2018-08-13|
|132| 2018-08-16|
|143| 2018-10-06|
+---+--------------+
# Returns the first monday of a week
df_mon = df.withColumn("scheduled_date", previous_day('scheduled_date', 'monday'))
df_mon.show()
+---+--------------+
| id|scheduled_date|
+---+--------------+
|241| 2018-10-08|
|423| 2018-09-24|
|126| 2018-09-24|
|123| 2018-08-13|
|132| 2018-08-13|
|143| 2018-10-01|
+---+--------------+
# You can groupBy and do agg count of 'id'.
df_mon_grp = df_mon.groupBy('scheduled_date').agg(count('id')).orderBy('scheduled_date')
# Reformatting to match your resulting output.
df_mon_grp = df_mon_grp.withColumn('scheduled_date', date_format(unix_timestamp('scheduled_date', "yyyy-MM-dd") \
.cast('timestamp'), 'MM/dd/yyyy'))
df_mon_grp.show()
+--------------+---------+
|scheduled_date|count(id)|
+--------------+---------+
| 08/13/2018| 2|
| 09/24/2018| 2|
| 10/01/2018| 1|
| 10/08/2018| 1|
+--------------+---------+
推荐阅读
- rust - 是否在重新分配可变变量时调用析构函数?
- python - 在模型中指定 upload_to 参数似乎没有做任何事情
- google-cloud-platform - Google Cloud Platform 权限问题(pub/sub -> Dataflow -> Google Cloud Storage bucket)
- java - 背景颜色/图像已绘制但形状不会显示
- javascript - 我正在创建一个代码来根据一天中的时间向我打招呼,由于某种原因,它无法将我的名字与其他文本区分开来
- jquery - 获取选中/未选中复选框值的总和并填充 div
- google-apps-script - 根据日期+时间锁定 Google 表格标签
- android - 如何设置验证规则以仅在 Cloud Firestore 中不存在具有相同字段的文档时才允许创建文档
- c - 在程序中使用 sem_wait
- shell - 从外壳启用 OEM 解锁