python - 如何对 PySpark Dataframe 中的每一行执行复杂的处理
问题描述
我有一个 PySpark 数据框如下
Customer_ID Address_ID Order_ID Order_Date
Cust_1 Addr_1 1 31-Dec-20
Cust_1 Addr_1 2 23-Jan-21
Cust_1 Addr_1 3 06-Feb-21
Cust_1 Addr_2 4 13-Feb-21
Cust_1 Addr_2 5 20-Feb-21
Cust_1 Addr_3 6 18-Mar-21
Cust_1 Addr_3 7 23-Mar-21
Cust_2 Addr_4 8 31-Dec-20
Cust_2 Addr_4 9 23-Jan-21
Cust_2 Addr_4 10 06-Feb-21
Cust_2 Addr_4 11 13-Feb-21
Cust_2 Addr_4 12 20-Feb-21
Cust_2 Addr_5 13 18-Mar-21
Cust_2 Addr_5 14 23-Mar-21
列分别为customer id
、address id
和放置order id
date the order was
Order_ID
总是独一无二的
对于每个订单(每一行),我需要计算(customer c1,address a1)对的订单份额
用下式定义的ord_share(c1,a1)表示 order_share ,
The total number of orders between (Order_Date) and (Order_Date - 90 days) by c1 from a1
----------------------------------------------------------------------------------------
The total number of orders between (Order_Date) and (Order_Date - 90 days) for all addresses by c1
注——上式中的下划线表示除法
90 天是窗口大小。上表中的一个示例:(为了便于理解,
我将其保留为一小部分)order_share
对于 ORDER_ID 7:
订单总数为 7 - (by Cust_1)
ord_share (Cust_1,Addr_1) = 3/7, ord_share (Cust_1,Addr_2) = 2/7, ord_share (Cust_1,Addr_3) = 2/7
对于 ORDER_ID 6:
订单总数为 6 -(通过 Cust_1)
ord_share (Cust_1,Addr_1) = 3/6, ord_share (Cust_1,Addr_2) = 2/6, ord_share (Cust_1,Addr_3) = 1/6
对于 ORDER_ID 5:
订单总数为 5 - (by Cust_1)
ord_share (Cust_1,Addr_1) = 3, ord_share (Cust_1,Addr_2) = 2, ord_share (Cust_1,Addr_3) = 0
等等......我需要为所有行存储这些。我的输出格式应如下所示
(Is_original_address - 此列指的是 Address_ID 是否是下订单的原始地址)
Customer_ID Address_ID Order_ID Order_Share Is_original_address
Cust_1 Addr_1 7 3/7 0
Cust_1 Addr_2 7 2/7 0
Cust_1 Addr_3 7 2/7 1
Cust_1 Addr_1 6 3/6 0
Cust_1 Addr_2 6 2/6 0
Cust_1 Addr_3 6 1/6 1
Cust_1 Addr_1 5 3/5 0
Cust_1 Addr_2 5 2/5 1
Cust_1 Addr_3 5 0/5 0
.
.
.
For all rows
所以基本上,输入中的每一行都会根据客户的地址数量扩展到输出中的多行
注意- 初始数据框中的列未按排序顺序或按任何顺序分组,我只是选择这样一个示例来帮助解释
我发现很难解决这个问题。我想了很多,我似乎想不出任何加入/分组数据的方法来做到这一点,因为每一行都是独一无二的。我真的不确定如何获取输出数据帧。
从我的想法来看,我必须克隆原始数据框,并且对于每一行,我可能必须进行多个分组或连接。我真的不确定如何开始实施。
任何帮助,将不胜感激。谢谢!
如果需要任何其他信息,请告诉我。
解决方案
正如@Christophe 评论的那样,这使用窗口函数,但仅用于计算分母
data=[
('c1','a1', 1,'2020-12-31'),
('c1','a1', 2,'2021-01-23'),
('c1','a1', 3,'2021-02-06'),
('c1','a2', 4,'2021-02-13'),
('c1','a2', 5,'2021-02-20'),
('c1','a3', 6,'2021-03-18'),
('c1','a3', 7,'2021-03-23'),
('c2','a4', 8,'2020-12-31'),
('c2','a4', 9,'2021-01-23'),
('c2','a4',10,'2021-02-06'),
('c2','a4',11,'2021-02-13'),
('c2','a4',12,'2021-02-20'),
('c2','a5',13,'2021-03-18'),
('c2','a5',14,'2021-03-23'),
]
df = spark.createDataFrame(data=data, schema = ['c_id','a_id','order_id','order_date'])
df=df.select('c_id','a_id','order_id',F.to_date(F.col('order_date')).alias('date'))
df.createOrReplaceTempView('orders')
spark.sql("""
WITH address_combinations AS (
SELECT o1.order_id, o2.c_id, o2.a_id
, CASE WHEN o1.a_id=o2.a_id THEN 1 ELSE 0 END AS is_original_address
, COUNT(CASE WHEN DATEDIFF(o1.date, o2.date) BETWEEN 0 AND 90 THEN 1 END) AS num_orders
FROM orders o1
JOIN orders o2 ON o1.c_id=o2.c_id
GROUP BY o1.order_id, o2.c_id, o2.a_id, is_original_address
)
SELECT c_id, a_id, order_id
, CONCAT(num_orders, '/', SUM(num_orders) OVER (PARTITION BY order_id)) AS order_share
, is_original_address
FROM address_combinations
ORDER BY order_id, a_id
""").show(200)
输出:
+----+----+--------+-----------+-------------------+
|c_id|a_id|order_id|order_share|is_original_address|
+----+----+--------+-----------+-------------------+
| c1| a1| 1| 1/1| 1|
| c1| a2| 1| 0/1| 0|
| c1| a3| 1| 0/1| 0|
| c1| a1| 2| 2/2| 1|
| c1| a2| 2| 0/2| 0|
| c1| a3| 2| 0/2| 0|
| c1| a1| 3| 3/3| 1|
| c1| a2| 3| 0/3| 0|
| c1| a3| 3| 0/3| 0|
| c1| a1| 4| 3/4| 0|
| c1| a2| 4| 1/4| 1|
| c1| a3| 4| 0/4| 0|
| c1| a1| 5| 3/5| 0|
| c1| a2| 5| 2/5| 1|
| c1| a3| 5| 0/5| 0|
| c1| a1| 6| 3/6| 0|
| c1| a2| 6| 2/6| 0|
| c1| a3| 6| 1/6| 1|
| c1| a1| 7| 3/7| 0|
| c1| a2| 7| 2/7| 0|
| c1| a3| 7| 2/7| 1|
| c2| a4| 8| 1/1| 1|
| c2| a5| 8| 0/1| 0|
| c2| a4| 9| 2/2| 1|
| c2| a5| 9| 0/2| 0|
| c2| a4| 10| 3/3| 1|
| c2| a5| 10| 0/3| 0|
| c2| a4| 11| 4/4| 1|
| c2| a5| 11| 0/4| 0|
| c2| a4| 12| 5/5| 1|
| c2| a5| 12| 0/5| 0|
| c2| a4| 13| 5/6| 0|
| c2| a5| 13| 1/6| 1|
| c2| a4| 14| 5/7| 0|
| c2| a5| 14| 2/7| 1|
+----+----+--------+-----------+-------------------+
不确定这是否需要,但这里是使用 Python df
API 重新实现的完全相同的 SQL:
from pyspark.sql import Window
(
df.alias('o1')
.join(df.alias('o2')
, on=F.col('o1.c_id')==F.col('o2.c_id')
, how='inner'
)
.select(F.col('o1.order_id'), F.col('o2.c_id'), F.col('o2.a_id')
, F.when(F.datediff(F.col('o1.date'),F.col('o2.date')).between(0, 90), 1).alias('is_order')
, F.when(F.col('o1.a_id')==F.col('o2.a_id'), 1).otherwise(0).alias('is_original_address')
)
.groupby('order_id','c_id','a_id','is_original_address')
.agg(F.count('is_order').alias('num_orders'))
.select('c_id','a_id','order_id'
, F.concat(F.col('num_orders')
, F.lit('/')
, F.sum('num_orders').over(Window.partitionBy('order_id'))
).alias('order_share')
, 'is_original_address'
)
.sort('order_id','a_id')
).show(200)
快速解释:
address_combinations
首先将订单表与自身自连接以获得所有可能的组合。但是,可能存在重复,所以我们执行GROUP BY
和COUNT
90d 时间窗口内的订单数量。
下一部分简单地给我们分母并根据需要对其进行格式化(“x/y”)
希望这能满足您的要求!
推荐阅读
- ubuntu - Hyperledger Fabric 网络中的 CouchDB 错误 - 数据库不存在
- java - 通过参数继承 Spring REST 控制器
- botframework - 即使在用户发送任何消息之前,如何在团队中获得欢迎消息
- delphi - 从 TListViewItem PlaceOffset 创建一个新动画不起作用
- openshift - Openshift - 用于获取 Pod 的 ARTIFACT_URL 参数或其部署的应用程序版本的 API
- c - 我在c的cs50沙箱中混合了两个程序?
- node.js - 使用 node.js 将 CSV 导入 mongodb
- python - 以矩阵为输出的 NN 多元回归
- c# - 在 Visual Studio 中编写简单代码测试的最佳方法
- angular - Angular中的DomSanitizer不适用于Transform css属性