首页 > 解决方案 > 如何对 PySpark Dataframe 中的每一行执行复杂的处理

问题描述

我有一个 PySpark 数据框如下

Customer_ID   Address_ID    Order_ID    Order_Date
 Cust_1       Addr_1            1       31-Dec-20
 Cust_1       Addr_1            2       23-Jan-21
 Cust_1       Addr_1            3       06-Feb-21
 Cust_1       Addr_2            4       13-Feb-21
 Cust_1       Addr_2            5       20-Feb-21
 Cust_1       Addr_3            6       18-Mar-21
 Cust_1       Addr_3            7       23-Mar-21
 Cust_2       Addr_4            8       31-Dec-20
 Cust_2       Addr_4            9       23-Jan-21
 Cust_2       Addr_4            10      06-Feb-21
 Cust_2       Addr_4            11      13-Feb-21
 Cust_2       Addr_4            12      20-Feb-21
 Cust_2       Addr_5            13      18-Mar-21
 Cust_2       Addr_5            14      23-Mar-21

列分别为customer idaddress id和放置order iddate the order was

Order_ID总是独一无二的

对于每个订单(每一行),我需要计算(customer c1,address a1)对的订单份额

用下式定义的ord_share(c1,a1)表示 order_share ,

The total number of orders between (Order_Date) and (Order_Date - 90 days) by c1 from a1  
----------------------------------------------------------------------------------------
The total number of orders between (Order_Date) and (Order_Date - 90 days) for all addresses by c1  

——上式中的下划线表示除法

90 天是窗口大小。上表中的一个示例:(为了便于理解,
我将其保留为一小部分)order_share

对于 ORDER_ID 7
订单总数为 7 - (by Cust_1)
ord_share (Cust_1,Addr_1) = 3/7, ord_share (Cust_1,Addr_2) = 2/7, ord_share (Cust_1,Addr_3) = 2/7

对于 ORDER_ID 6
订单总数为 6 -(通过 Cust_1)
ord_share (Cust_1,Addr_1) = 3/6, ord_share (Cust_1,Addr_2) = 2/6, ord_share (Cust_1,Addr_3) = 1/6

对于 ORDER_ID 5
订单总数为 5 - (by Cust_1)
ord_share (Cust_1,Addr_1) = 3, ord_share (Cust_1,Addr_2) = 2, ord_share (Cust_1,Addr_3) = 0

等等......我需要为所有行存储这些。我的输出格式应如下所示

(Is_original_address - 此列指的是 Address_ID 是否是下订单的原始地址)

Customer_ID   Address_ID    Order_ID    Order_Share  Is_original_address

Cust_1         Addr_1            7       3/7           0 
Cust_1         Addr_2            7       2/7           0
Cust_1         Addr_3            7       2/7           1
Cust_1         Addr_1            6       3/6           0
Cust_1         Addr_2            6       2/6           0
Cust_1         Addr_3            6       1/6           1
Cust_1         Addr_1            5       3/5           0
Cust_1         Addr_2            5       2/5           1
Cust_1         Addr_3            5       0/5           0 
.
.
.
For all rows

所以基本上,输入中的每一行都会根据客户的地址数量扩展到输出中的多行

注意- 初始数据框中的列未按排序顺序或按任何顺序分组,我只是选择这样一个示例来帮助解释

我发现很难解决这个问题。我想了很多,我似乎想不出任何加入/分组数据的方法来做到这一点,因为每一行都是独一无二的。我真的不确定如何获取输出数据帧。
从我的想法来看,我必须克隆原始数据框,并且对于每一行,我可能必须进行多个分组或连接。我真的不确定如何开始实施。

任何帮助,将不胜感激。谢谢!

如果需要任何其他信息,请告诉我。

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


正如@Christophe 评论的那样,这使用窗口函数,但仅用于计算分母

data=[
    ('c1','a1', 1,'2020-12-31'),
    ('c1','a1', 2,'2021-01-23'),
    ('c1','a1', 3,'2021-02-06'),
    ('c1','a2', 4,'2021-02-13'),
    ('c1','a2', 5,'2021-02-20'),
    ('c1','a3', 6,'2021-03-18'),
    ('c1','a3', 7,'2021-03-23'),
    ('c2','a4', 8,'2020-12-31'),
    ('c2','a4', 9,'2021-01-23'),
    ('c2','a4',10,'2021-02-06'),
    ('c2','a4',11,'2021-02-13'),
    ('c2','a4',12,'2021-02-20'),
    ('c2','a5',13,'2021-03-18'),
    ('c2','a5',14,'2021-03-23'),
]
df = spark.createDataFrame(data=data, schema = ['c_id','a_id','order_id','order_date'])
df=df.select('c_id','a_id','order_id',F.to_date(F.col('order_date')).alias('date'))
df.createOrReplaceTempView('orders')

spark.sql("""
WITH address_combinations AS (
  SELECT o1.order_id, o2.c_id, o2.a_id
  , CASE WHEN o1.a_id=o2.a_id THEN 1 ELSE 0 END AS is_original_address
  , COUNT(CASE WHEN DATEDIFF(o1.date, o2.date) BETWEEN 0 AND 90 THEN 1 END) AS num_orders
  FROM orders o1
  JOIN orders o2 ON o1.c_id=o2.c_id
  GROUP BY o1.order_id, o2.c_id, o2.a_id, is_original_address
)
SELECT c_id, a_id, order_id
, CONCAT(num_orders, '/', SUM(num_orders) OVER (PARTITION BY order_id)) AS order_share
, is_original_address
FROM address_combinations
ORDER BY order_id, a_id
""").show(200)

输出:

+----+----+--------+-----------+-------------------+
|c_id|a_id|order_id|order_share|is_original_address|
+----+----+--------+-----------+-------------------+
|  c1|  a1|       1|        1/1|                  1|
|  c1|  a2|       1|        0/1|                  0|
|  c1|  a3|       1|        0/1|                  0|
|  c1|  a1|       2|        2/2|                  1|
|  c1|  a2|       2|        0/2|                  0|
|  c1|  a3|       2|        0/2|                  0|
|  c1|  a1|       3|        3/3|                  1|
|  c1|  a2|       3|        0/3|                  0|
|  c1|  a3|       3|        0/3|                  0|
|  c1|  a1|       4|        3/4|                  0|
|  c1|  a2|       4|        1/4|                  1|
|  c1|  a3|       4|        0/4|                  0|
|  c1|  a1|       5|        3/5|                  0|
|  c1|  a2|       5|        2/5|                  1|
|  c1|  a3|       5|        0/5|                  0|
|  c1|  a1|       6|        3/6|                  0|
|  c1|  a2|       6|        2/6|                  0|
|  c1|  a3|       6|        1/6|                  1|
|  c1|  a1|       7|        3/7|                  0|
|  c1|  a2|       7|        2/7|                  0|
|  c1|  a3|       7|        2/7|                  1|
|  c2|  a4|       8|        1/1|                  1|
|  c2|  a5|       8|        0/1|                  0|
|  c2|  a4|       9|        2/2|                  1|
|  c2|  a5|       9|        0/2|                  0|
|  c2|  a4|      10|        3/3|                  1|
|  c2|  a5|      10|        0/3|                  0|
|  c2|  a4|      11|        4/4|                  1|
|  c2|  a5|      11|        0/4|                  0|
|  c2|  a4|      12|        5/5|                  1|
|  c2|  a5|      12|        0/5|                  0|
|  c2|  a4|      13|        5/6|                  0|
|  c2|  a5|      13|        1/6|                  1|
|  c2|  a4|      14|        5/7|                  0|
|  c2|  a5|      14|        2/7|                  1|
+----+----+--------+-----------+-------------------+

不确定这是否需要,但这里是使用 Python dfAPI 重新实现的完全相同的 SQL:

from pyspark.sql import Window

(
    df.alias('o1')
    .join(df.alias('o2')
          , on=F.col('o1.c_id')==F.col('o2.c_id')
          , how='inner'
         )
    .select(F.col('o1.order_id'), F.col('o2.c_id'), F.col('o2.a_id')
            , F.when(F.datediff(F.col('o1.date'),F.col('o2.date')).between(0, 90), 1).alias('is_order')
            , F.when(F.col('o1.a_id')==F.col('o2.a_id'), 1).otherwise(0).alias('is_original_address')
           )
    .groupby('order_id','c_id','a_id','is_original_address')
    .agg(F.count('is_order').alias('num_orders'))
    .select('c_id','a_id','order_id'
            , F.concat(F.col('num_orders')
                       , F.lit('/')
                       , F.sum('num_orders').over(Window.partitionBy('order_id'))
                      ).alias('order_share')
            , 'is_original_address'
           )
    .sort('order_id','a_id')
).show(200)

快速解释: address_combinations首先将订单表与自身自连接以获得所有可能的组合。但是,可能存在重复,所以我们执行GROUP BYCOUNT90d 时间窗口内的订单数量。

下一部分简单地给我们分母并根据需要对其进行格式化(“x/y”)

希望这能满足您的要求!


推荐阅读