首页 > 解决方案 > Pyspark 高级窗口函数

问题描述

这是我的数据框:

FlightDate=[20,40,51,50,60,15,17,37,36,50]
IssuingDate=[10,15,44,45,55,10,2,30,32,24]
Revenue = [100,50,40,70,60,40,30,100,200,100]
Customer = ['a','a','a','a','a','b','b','b','b','b']
df = spark.createDataFrame(pd.DataFrame([Customer,FlightDate,IssuingDate, Revenue]).T, schema=["Customer",'FlightDate', 'IssuingDate','Revenue'])
df.show()

+--------+----------+-----------+-------+
|Customer|FlightDate|IssuingDate|Revenue|
+--------+----------+-----------+-------+
|       a|        20|         10|    100|
|       a|        40|         15|     50|
|       a|        51|         44|     40|
|       a|        50|         45|     70|
|       a|        60|         55|     60|
|       b|        15|         10|     40|
|       b|        27|          2|     30|
|       b|        37|         30|    100|
|       b|        36|         32|    200|
|       b|        50|         24|    100|
+--------+----------+-----------+-------+

为了方便起见,我用了几天的数字。

对于每个客户,我想将研究的 FlightDate 和研究的 FlightDate + 10 天之间的所有发行日期的收入相加。

也就是说 :

这是期望的结果:

+--------+----------+-----------+-------+------+
|Customer|FlightDate|IssuingDate|Revenue|Result|
+--------+----------+-----------+-------+------+
|       a|        20|         10|    100|     0|
|       a|        40|         15|     50|   110|
|       a|        51|         44|     40|    60|
|       a|        50|         45|     70|    60|
|       a|        60|         55|     60|     0|
|       b|        15|         10|     40|   100|
|       b|        27|          2|     30|   300|
|       b|        37|         30|    100|     0|
|       b|        36|         32|    200|     0|
|       b|        50|         24|    100|     0|
+--------+----------+-----------+-------+------+

我知道它会涉及一些窗口功能,但这似乎有点棘手。谢谢

标签: pythonpysparkpyspark-sqlwindow-functions

解决方案


不需要窗口函数。它只是一个 join 和一个 agg :

df.alias("df").join(
    df.alias("df_2"),
    on=F.expr(
        "df.Customer = df_2.Customer "
        "and df_2.issuingdate between df.flightdate and df.flightdate+10"
    ), 
    how='left'
).groupBy(
    *('df.{}'.format(c) 
      for c 
      in df.columns)
).agg(
    F.sum(F.coalesce(
        "df_2.revenue", 
        F.lit(0))
    ).alias("result")
).show()

+--------+----------+-----------+-------+------+                                
|Customer|FlightDate|IssuingDate|Revenue|result|
+--------+----------+-----------+-------+------+
|       a|        20|         10|    100|     0|
|       a|        40|         15|     50|   110|
|       a|        50|         45|     70|    60|
|       a|        51|         44|     40|    60|
|       a|        60|         55|     60|     0|
|       b|        15|         10|     40|   100|
|       b|        27|          2|     30|   300|
|       b|        36|         32|    200|     0|
|       b|        37|         30|    100|     0|
|       b|        50|         24|    100|     0|
+--------+----------+-----------+-------+------+

推荐阅读