python - Pyspark 以周格式显示日期值,包括周开始日期和结束日期
问题描述
我有以下似乎很长的代码,是否有可以应用的简化格式来实现相同的结果。我想要实现的是获取一周的开始和结束日期并计算该特定周的记录。代码:创建一个数据框:
new_list = [
{"inv_dt":"01/01/2020","count":1},
{"inv_dt":"02/01/2020", "count":2},
{"inv_dt":"10/01/2020", "count":5},
{"inv_dt":"11/01/2020","count":1},
{"inv_dt":"12/01/2020", "count":5},
{"inv_dt":"20/01/2020", "count":3},
{"inv_dt":"22/01/2020", "count":2},
{"inv_dt":"28/01/2020", "count":1}
]
from pyspark.sql import functions as F
from pyspark.sql import Row
df = spark.createDataFrame(Row(**x) for x in new_list)
现在我将字符串转换为日期格式:
df = df.withColumn("inv_dt",F.to_date("inv_dt", "dd/MM/yyyy"))
df.show()
+----------+-----+
| inv_dt|count|
+----------+-----+
|2020-01-01| 1|
|2020-01-02| 2|
|2020-01-10| 5|
|2020-01-11| 1|
|2020-01-12| 5|
|2020-01-20| 3|
|2020-01-22| 2|
|2020-01-28| 1|
+----------+-----+
获得一年中的一周
df = df.withColumn('week_of_year',F.weekofyear(df.inv_dt))
df.show()
+----------+-----+------------+
| inv_dt|count|week_of_year|
+----------+-----+------------+
|2020-01-01| 1| 1|
|2020-01-02| 2| 1|
|2020-01-10| 5| 2|
|2020-01-11| 1| 2|
|2020-01-12| 5| 2|
|2020-01-20| 3| 4|
|2020-01-22| 2| 4|
|2020-01-28| 1| 5|
+----------+-----+------------+
使用 selectExpr 获取一周的开始和结束,加入开始和结束为 Week_Period,然后使用 groupby 获取每周计数
df = df.withColumn('day_of_week', F.dayofweek(F.col('inv_dt')))
df = df.selectExpr('*', 'date_sub(inv_dt, day_of_week-1) as week_start')
df = df.selectExpr('*', 'date_add(inv_dt, 7-day_of_week) as week_end')
df = df.withColumn('Week_Period', F.concat(F.col('week_start'),F.lit(' - '), F.col('week_end')))
list_of_columns = ['week_of_year','Week_Period']
df = df.groupby([F.col(x) for x in list_of_columns]).agg(F.sum(F.col('count')).alias('count'))
df.sort(df.week_of_year).show()
+------------+--------------------+-----+
|week_of_year| Week_Period|count|
+------------+--------------------+-----+
| 1|2019-12-29 - 2020...| 3|
| 2|2020-01-05 - 2020...| 6|
| 2|2020-01-12 - 2020...| 5|
| 4|2020-01-19 - 2020...| 5|
| 5|2020-01-26 - 2020...| 1|
+------------+--------------------+-----+
解决方案
这段代码更干净。
list_of_columns = ['week_of_year','Week_Period']
df\
.withColumn("day_of_week", F.dayofweek(F.col("inv_dt")))\
.withColumn("week_end", F.next_day(F.col("inv_dt"), 'Sat'))\
.withColumn("week_start", F.date_add(F.col("week_end"), -6))\
.withColumn('Week_Period', F.concat(F.col('week_start'),F.lit(' - '), F.col('week_end')))\
.groupby([F.col(x) for x in list_of_columns]).agg(F.sum(F.col('count')).alias('count'))\
.sort(df.week_of_year)\
.show(truncate = False)
+------------+-----------------------+-----+
|week_of_year|Week_Period |count|
+------------+-----------------------+-----+
|1 |2019-12-29 - 2020-01-04|3 |
|2 |2020-01-05 - 2020-01-11|5 |
|2 |2020-01-12 - 2020-01-18|6 |
|4 |2020-01-19 - 2020-01-25|5 |
|5 |2020-01-26 - 2020-02-01|1 |
+------------+-----------------------+-----+
推荐阅读
- jakarta-ee - 在 Wildfly 16 中启动 WAR 时出现 Infinispan NotSerializableException
- flutter - Flutter:在 Widget 的状态的 initState 中从 Navigator 获取传递的参数
- java - AES CTR - 如何附加到 CipherOutputStream?
- python - 为什么 Python 3 默认不将 python3 安装到虚拟环境中?
- javascript - 侦听 Socket.io 事件适用于服务器端,但不适用于客户端
- javascript - 程序动态选择
- python - 如何向 Python 中的特定线程抛出异常?
- dart - tts 未绑定(文本到语音)
- python - 在 Python 中使用 VS Code 进行远程 SSH 开发 - 挂起“启动 Jupyter 服务器”
- excel - 使用 PSExcel 模块编辑 CSV