首页 > 解决方案 > 基于另一个表 PySpark/SQL 的日期时间列的聚合列

问题描述

我目前正在尝试使用另一个表中的日期对表的列执行与日期相关的聚合。表 1 包含用户 ID 和日期(以及其他未汇总的信息)。表 2 包含我希望汇总的值以及相同的 ID 和不同的日期。

目标是仅聚合表 2 中的值,前提是它们在表 1 中某一行的日期之前。

在下图中,所需的聚合函数是“均值”,但是如果可以提供通用 PySpark(或 SQL)解决方案,允许此聚合函数是内置的(F.mean,F.sum)自定义用户-定义的函数,那将是理想的。

表 1 - 日期表(注意:用户 ID 可以在两个表中重复)

+---+---------- +----------          
|USER|   DATE   |USER_STATE|   
+---+---------- +---------- 
|  3 | 7/1/2019 |  Current |
|  1 | 6/9/2019 |  Expired |
|  1 | 1/1/2019 |  Current |
+----+----------+----------- 

表 2 - 聚合表

+---+---------- +----------          
|USER|CHARGEDATE|AMOUNTPAID|   
+---+---------- +---------- 
|  1 | 7/1/2018 |  10.00   |
|  1 | 5/1/2019 |  40.00   |
|  1 | 2/2/2019 |  10.00   |
|  3 | 1/2/2019 |  15.00   |
+----+----------+----------- 

期望的输出- 聚合(平均值)是按用户计算的,取决于表 1 中的 CHARGEDATE 是否早于 DATE

+---+---------- +----------+---------------          
|USER|   DATE   |USER_STATE|MEAN_AMOUNTPAID|   
+---+---------- +----------+--------------- 
|  3 | 7/1/2019 |  Current |    15.00      |
|  1 | 6/9/2019 |  Expired |    20.00      | 
|  1 | 1/1/2019 |  Current |    10.00      |
+----+----------+----------+--------------- 
Row 2 - includes all user 1 Table 2 values because all ChargedDate< date 
Row 3 - includes only includes user 1's row 1 Table 2 value because it's the only chargeddate less than date

我知道这可以通过在表 1 中的每一行上运行一个循环、获取该行的 DATE 并使用它来查询第二个表来低效地完成。如果可能的话,我正在寻找没有循环的解决方案。提前致谢!

标签: mysqlsqlapache-sparkpysparkapache-spark-sql

解决方案


PySpark做到这一点的方法将涉及将您的DATECHARGEDATE列转换DateType为能够filteron DATE>CHARGEDATE。所以我假设你的日期是格式的"M/d/yyyy",如果是其他方式,只需将其替换为"d/M/yyyy"

#data.show()
#+----+--------+----------+
#|USER|    DATE|USER_STATE|
#+----+--------+----------+
#|   3|7/1/2019|   Current|
#|   1|6/9/2019|   Expired|
#|   1|1/1/2019|   Current|
#+----+--------+----------+

#aggregation.show()
#+----+----------+----------+
#|USER|CHARGEDATE|AMOUNTPAID|
#+----+----------+----------+
#|   1|  7/1/2018|      10.0|
#|   1|  5/1/2019|      40.0|
#|   1|  2/2/2019|      10.0|
#|   3|  1/2/2019|      15.0|
#+----+----------+----------+

from pyspark.sql import functions as F
data.join(aggregation,['USER'])\
      .withColumn("DATE",F.to_date("DATE","M/d/yyyy"))\
      .withColumn("CHARGEDATE", F.to_date("CHARGEDATE", "M/d/yyyy"))\
      .filter("DATE>CHARGEDATE")\
      .groupBy("USER","DATE","USER_STATE").agg(F.mean("AMOUNTPAID").alias("mean_amount_paid"))\
      .show()

+----+----------+----------+----------------+
|USER|      DATE|USER_STATE|mean_amount_paid|
+----+----------+----------+----------------+
|   1|2019-06-09|   Expired|            20.0|
|   1|2019-01-01|   Current|            10.0|
|   3|2019-07-01|   Current|            15.0|
+----+----------+----------+----------------+

推荐阅读