mysql - 基于另一个表 PySpark/SQL 的日期时间列的聚合列
问题描述
我目前正在尝试使用另一个表中的日期对表的列执行与日期相关的聚合。表 1 包含用户 ID 和日期(以及其他未汇总的信息)。表 2 包含我希望汇总的值以及相同的 ID 和不同的日期。
目标是仅聚合表 2 中的值,前提是它们在表 1 中某一行的日期之前。
在下图中,所需的聚合函数是“均值”,但是如果可以提供通用 PySpark(或 SQL)解决方案,允许此聚合函数是内置的(F.mean,F.sum)或自定义用户-定义的函数,那将是理想的。
表 1 - 日期表(注意:用户 ID 可以在两个表中重复)
+---+---------- +----------
|USER| DATE |USER_STATE|
+---+---------- +----------
| 3 | 7/1/2019 | Current |
| 1 | 6/9/2019 | Expired |
| 1 | 1/1/2019 | Current |
+----+----------+-----------
表 2 - 聚合表
+---+---------- +----------
|USER|CHARGEDATE|AMOUNTPAID|
+---+---------- +----------
| 1 | 7/1/2018 | 10.00 |
| 1 | 5/1/2019 | 40.00 |
| 1 | 2/2/2019 | 10.00 |
| 3 | 1/2/2019 | 15.00 |
+----+----------+-----------
期望的输出- 聚合(平均值)是按用户计算的,取决于表 1 中的 CHARGEDATE 是否早于 DATE
+---+---------- +----------+---------------
|USER| DATE |USER_STATE|MEAN_AMOUNTPAID|
+---+---------- +----------+---------------
| 3 | 7/1/2019 | Current | 15.00 |
| 1 | 6/9/2019 | Expired | 20.00 |
| 1 | 1/1/2019 | Current | 10.00 |
+----+----------+----------+---------------
Row 2 - includes all user 1 Table 2 values because all ChargedDate< date
Row 3 - includes only includes user 1's row 1 Table 2 value because it's the only chargeddate less than date
我知道这可以通过在表 1 中的每一行上运行一个循环、获取该行的 DATE 并使用它来查询第二个表来低效地完成。如果可能的话,我正在寻找没有循环的解决方案。提前致谢!
解决方案
PySpark
做到这一点的方法将涉及将您的DATE
和CHARGEDATE
列转换DateType
为能够filter
on DATE>CHARGEDATE
。所以我假设你的日期是格式的"M/d/yyyy"
,如果是其他方式,只需将其替换为"d/M/yyyy"
#data.show()
#+----+--------+----------+
#|USER| DATE|USER_STATE|
#+----+--------+----------+
#| 3|7/1/2019| Current|
#| 1|6/9/2019| Expired|
#| 1|1/1/2019| Current|
#+----+--------+----------+
#aggregation.show()
#+----+----------+----------+
#|USER|CHARGEDATE|AMOUNTPAID|
#+----+----------+----------+
#| 1| 7/1/2018| 10.0|
#| 1| 5/1/2019| 40.0|
#| 1| 2/2/2019| 10.0|
#| 3| 1/2/2019| 15.0|
#+----+----------+----------+
from pyspark.sql import functions as F
data.join(aggregation,['USER'])\
.withColumn("DATE",F.to_date("DATE","M/d/yyyy"))\
.withColumn("CHARGEDATE", F.to_date("CHARGEDATE", "M/d/yyyy"))\
.filter("DATE>CHARGEDATE")\
.groupBy("USER","DATE","USER_STATE").agg(F.mean("AMOUNTPAID").alias("mean_amount_paid"))\
.show()
+----+----------+----------+----------------+
|USER| DATE|USER_STATE|mean_amount_paid|
+----+----------+----------+----------------+
| 1|2019-06-09| Expired| 20.0|
| 1|2019-01-01| Current| 10.0|
| 3|2019-07-01| Current| 15.0|
+----+----------+----------+----------------+
推荐阅读
- java - Which Java API from the Azure SDK to manage PostgreSQL servers?
- python - MQTT 发布者是否可以在主题上定义允许的订阅者?
- ios - UIWebView 的 scalesPageToFit 但设置为 FALSE 的等效 WKWebView
- c# - 如何在 Wix 自定义引导程序应用程序中进行捆绑更新?
- java - 我如何才能使用 findAll() 而不是所有内容只返回特定信息?
- git - Azure devops:管道触发 CI 在不同存储库中的分支上构建
- google-cloud-platform - Google Cloud Storage - 在混合连接上编辑 VPN 隧道
- uinavigationcontroller - 隐藏导航栏 Swiftui
- java - 如何获取字符串的输入参数并将其转换为整数列表的名称?
- mongoose - Mongoose 在读取/查找时删除不在架构上的字段