首页 > 解决方案 > Pyspark - 基于前一行值的增量值

问题描述

我有一个数据框,我需要根据以前的日期值修复一些包裹日期。这是一个例子:

+-------------------+---------------------+-----------------------+----------+
|account            |contract             |contract_parcel        |  date    |
+-------------------+---------------------+-----------------------+----------+
|              92397|                    1|                      1|2020-12-07|
|              92397|                    1|                      2|      null|
|              92397|                    2|                      1|2020-12-07|
|              92397|                    2|                      2|      null|
|              92397|                    2|                      3|      null|
|              92397|                    2|                      4|      null|
|              92397|                    2|                      5|      null|
|              92397|                    2|                      6|      null|
|              92397|                    3|                      1|2021-01-04|
|              92397|                    3|                      2|2021-02-01|
+-------------------+---------------------+-----------------------+----------+

对于每个帐户,都有多个包含多个包裹的合同。对于那些日期列是null我需要复制以前的包裹值但添加一个月的所有包裹等等。我尝试使用Windowwithlaglast函数,但我无法根据之前的值更新日期。我只是设法复制它。

我需要一个像下面这样的输出:

+-------------------+---------------------+-----------------------+----------+
|account            |contract             |contract_parcel        |  date    |
+-------------------+---------------------+-----------------------+----------+
|              92397|                    1|                      1|2020-12-07|
|              92397|                    1|                      2|2021-01-07|
|              92397|                    2|                      1|2020-12-07|
|              92397|                    2|                      2|2021-01-07|
|              92397|                    2|                      3|2021-02-07|
|              92397|                    2|                      4|2021-03-07|
|              92397|                    2|                      5|2021-04-07|
|              92397|                    2|                      6|2021-05-07|
|              92397|                    3|                      1|2021-01-04|
|              92397|                    3|                      2|2021-02-01|
+-------------------+---------------------+-----------------------+----------+

我也尝试通过迭代数据框,但性能很差。

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


我所做的主要想法是首先进行累积总和,以使月份的值从最后一个非空日期开始增加。检索后,您可以将其传递给add_months函数以替换空值。

from pyspark.sql import Window
import pyspark.sql.functions as f


group_window = Window.partitionBy('account', 'contract').orderBy('contract_parcel')
add_month_window = Window.partitionBy('account', 'contract', 'group').orderBy('contract_parcel')

cumulative_df = (df
                 .withColumn('group',  f.sum((f.col('date').isNotNull()).cast('int')).over(group_window))  
                 .withColumn('add_month', f.sum(f.col('date').isNull().cast('int')).over(add_month_window)))
+-------+--------+---------------+----------+-----+---------+
|account|contract|contract_parcel|date      |group|add_month|
+-------+--------+---------------+----------+-----+---------+
|92397  |1       |1              |2020-12-07|1    |0        |
|92397  |1       |2              |null      |1    |1        |
|92397  |2       |1              |2020-12-07|1    |0        |
|92397  |2       |2              |null      |1    |1        |
|92397  |2       |3              |null      |1    |2        |
|92397  |2       |4              |null      |1    |3        |
|92397  |2       |5              |null      |1    |4        |
|92397  |2       |6              |null      |1    |5        |
|92397  |3       |1              |2021-01-04|1    |0        |
|92397  |3       |2              |2021-02-01|2    |0        |
+-------+--------+---------------+----------+-----+---------+

replace_df = (cumulative_df
              .withColumn('date', f.first('date').over(add_month_window))
              .withColumn('date', f.expr('add_months(`date`, `add_month`)')))
+-------+--------+---------------+----------+-----+---------+
|account|contract|contract_parcel|date      |group|add_month|
+-------+--------+---------------+----------+-----+---------+
|92397  |1       |1              |2020-12-07|1    |0        |
|92397  |1       |2              |2021-01-07|1    |1        |
|92397  |2       |1              |2020-12-07|1    |0        |
|92397  |2       |2              |2021-01-07|1    |1        |
|92397  |2       |3              |2021-02-07|1    |2        |
|92397  |2       |4              |2021-03-07|1    |3        |
|92397  |2       |5              |2021-04-07|1    |4        |
|92397  |2       |6              |2021-05-07|1    |5        |
|92397  |3       |1              |2021-01-04|1    |0        |
|92397  |3       |2              |2021-02-01|2    |0        |
+-------+--------+---------------+----------+-----+---------+

output_df = replace_df.drop('group', 'add_month')
output_df.show(truncate=False)
+-------+--------+---------------+----------+
|account|contract|contract_parcel|date      |
+-------+--------+---------------+----------+
|92397  |1       |1              |2020-12-07|
|92397  |1       |2              |2021-01-07|
|92397  |2       |1              |2020-12-07|
|92397  |2       |2              |2021-01-07|
|92397  |2       |3              |2021-02-07|
|92397  |2       |4              |2021-03-07|
|92397  |2       |5              |2021-04-07|
|92397  |2       |6              |2021-05-07|
|92397  |3       |1              |2021-01-04|
|92397  |3       |2              |2021-02-01|
+-------+--------+---------------+----------+

推荐阅读