python - Pyspark - 获取具有条件的列的累积总和
问题描述
我有一个包含卡片、时间和金额的数据框,我需要在一个月的窗口内汇总卡片的金额(总和和计数)。
以下是数据的样子:
+--------------------+-------------------+------------+
| card_uid| date|amount_local|
+--------------------+-------------------+------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30| 8.99|
|card_0026uGZQwZQd...|2016-05-06 12:16:18| 16.19|
|card_0026uGZQwZQd...|2016-07-06 12:17:57| 16.19|
|card_003STfrgB8SZ...|2016-12-04 10:05:21| 58.8|
|card_005gBxyiDc6b...|2016-09-10 18:58:25| 27.95|
|card_005gBxyiDc6b...|2016-11-12 11:18:29| 12.99|
这是我到目前为止所做的。
+--------------------+-------------------+------------+----------------+
| card_uid| date|amount_local|duration_cum_sum|
+--------------------+-------------------+------------+----------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30| 8.99| 8.99|
|card_0026uGZQwZQd...|2016-05-06 12:16:18| 16.19| 16.19|
|card_0026uGZQwZQd...|2016-07-06 12:17:57| 16.19| 32.38|
|card_003STfrgB8SZ...|2016-12-04 10:05:21| 58.8| 58.8|
|card_005gBxyiDc6b...|2016-09-10 18:58:25| 27.95| 27.95|
|card_005gBxyiDc6b...|2016-11-12 11:18:29| 12.99| 40.94|
具有以下窗口功能:
partition = Window.partitionBy("card_uid").orderBy("date")
previousTransactionDate = data.withColumn("previous_tr_time", lag(data.date).over(partition)).select("transaction_id", "card_uid", "date", "previous_tr_time")
df_cum_sum = data.withColumn("duration_cum_sum", sum('amount_local').over(partition))
df_cum_sum.orderBy("card_uid","date").select("card_uid", "date", "amount_local", "duration_cum_sum").show()
但我唯一要补充的是两件事:
- 以相同方式聚合,仅当日期小于一个月时
- 为cum_sum输入零而不是相同的数量
所以所需的输出如下所示:
+--------------------+-------------------+------------+----------------+
| card_uid| date|amount_local|duration_cum_sum|
+--------------------+-------------------+------------+----------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30| 8.99| 0|
|card_0026uGZQwZQd...|2016-05-06 12:16:18| 16.19| 0|
|card_0026uGZQwZQd...|2016-05-12 12:17:57| 4.00| 16.19|
|card_0026uGZQwZQd...|2016-06-06 12:23:51| 16.19| 4.00| => Only 4 because de 16.19 was more than one month ago
|card_003STfrgB8SZ...|2016-12-04 10:05:21| 58.8| 0|
|card_005gBxyiDc6b...|2016-09-10 18:58:25| 27.95| 0|
|card_005gBxyiDc6b...|2016-09-12 11:18:29| 12.99| 27.95| => Previous amount
|card_005gBxyiDc6b...|2016-09-22 14:25:44| 23.99| 40.94| => 27.95 + 12.99
我不能 groupBy card_uid 因为我需要与原始数据相同的行数才能链接到另一个表
解决方案
您需要一个日期滚动窗口,窗口范围从过去 30 天到前一天。由于间隔函数不适用于窗口,您可以将日期转换为长值并使用天长值来创建窗口范围。
from pyspark.sql.functions import *
days = lambda i: i * 86400
partition = Window.partitionBy("card_uid").orderBy(col("date").cast("timestamp").cast("long")).rangeBetween(days(-30), days(-1))
df_cum_sum = data.withColumn("duration_cum_sum",sum(col('amount_local')).over(partition))\
.fillna(0,subset=['duration_cum_sum'])
df_cum_sum.show()
推荐阅读
- amazon-web-services - 如何使用 boto3 创建特定类型的 IAM 角色?
- lua - 文件写入中的错误“尝试索引布尔值”
- php - 如何覆盖 $_SERVER['REMOTE_ADDR'] 值以加载 IP 受限页面
- fix-protocol - FIXT1.1 ERROR_MISSING_EXECUTINGTRADER PartyRole
- wordpress - 如何在使用 Elementor 创建的产品页面模板上显示“产品不可用”消息
- java - 将类型 T 列表转换为对象
- neo4j - 存储/附加值列表作为 Neo4j 中关系的属性
- javascript - 如何使用 JavaScript 从 Firebase 存储中删除文件
- java - String.matches() 和 matcher.matches() 返回不同的结果
- django - Django Admin 内联对象