python - Pyspark - 基于前一行值的增量值
问题描述
我有一个数据框,我需要根据以前的日期值修复一些包裹日期。这是一个例子:
+-------------------+---------------------+-----------------------+----------+
|account |contract |contract_parcel | date |
+-------------------+---------------------+-----------------------+----------+
| 92397| 1| 1|2020-12-07|
| 92397| 1| 2| null|
| 92397| 2| 1|2020-12-07|
| 92397| 2| 2| null|
| 92397| 2| 3| null|
| 92397| 2| 4| null|
| 92397| 2| 5| null|
| 92397| 2| 6| null|
| 92397| 3| 1|2021-01-04|
| 92397| 3| 2|2021-02-01|
+-------------------+---------------------+-----------------------+----------+
对于每个帐户,都有多个包含多个包裹的合同。对于那些日期列是null
我需要复制以前的包裹值但添加一个月的所有包裹等等。我尝试使用Window
withlag
和last
函数,但我无法根据之前的值更新日期。我只是设法复制它。
我需要一个像下面这样的输出:
+-------------------+---------------------+-----------------------+----------+
|account |contract |contract_parcel | date |
+-------------------+---------------------+-----------------------+----------+
| 92397| 1| 1|2020-12-07|
| 92397| 1| 2|2021-01-07|
| 92397| 2| 1|2020-12-07|
| 92397| 2| 2|2021-01-07|
| 92397| 2| 3|2021-02-07|
| 92397| 2| 4|2021-03-07|
| 92397| 2| 5|2021-04-07|
| 92397| 2| 6|2021-05-07|
| 92397| 3| 1|2021-01-04|
| 92397| 3| 2|2021-02-01|
+-------------------+---------------------+-----------------------+----------+
我也尝试通过迭代数据框,但性能很差。
解决方案
我所做的主要想法是首先进行累积总和,以使月份的值从最后一个非空日期开始增加。检索后,您可以将其传递给add_months函数以替换空值。
from pyspark.sql import Window
import pyspark.sql.functions as f
group_window = Window.partitionBy('account', 'contract').orderBy('contract_parcel')
add_month_window = Window.partitionBy('account', 'contract', 'group').orderBy('contract_parcel')
cumulative_df = (df
.withColumn('group', f.sum((f.col('date').isNotNull()).cast('int')).over(group_window))
.withColumn('add_month', f.sum(f.col('date').isNull().cast('int')).over(add_month_window)))
+-------+--------+---------------+----------+-----+---------+
|account|contract|contract_parcel|date |group|add_month|
+-------+--------+---------------+----------+-----+---------+
|92397 |1 |1 |2020-12-07|1 |0 |
|92397 |1 |2 |null |1 |1 |
|92397 |2 |1 |2020-12-07|1 |0 |
|92397 |2 |2 |null |1 |1 |
|92397 |2 |3 |null |1 |2 |
|92397 |2 |4 |null |1 |3 |
|92397 |2 |5 |null |1 |4 |
|92397 |2 |6 |null |1 |5 |
|92397 |3 |1 |2021-01-04|1 |0 |
|92397 |3 |2 |2021-02-01|2 |0 |
+-------+--------+---------------+----------+-----+---------+
replace_df = (cumulative_df
.withColumn('date', f.first('date').over(add_month_window))
.withColumn('date', f.expr('add_months(`date`, `add_month`)')))
+-------+--------+---------------+----------+-----+---------+
|account|contract|contract_parcel|date |group|add_month|
+-------+--------+---------------+----------+-----+---------+
|92397 |1 |1 |2020-12-07|1 |0 |
|92397 |1 |2 |2021-01-07|1 |1 |
|92397 |2 |1 |2020-12-07|1 |0 |
|92397 |2 |2 |2021-01-07|1 |1 |
|92397 |2 |3 |2021-02-07|1 |2 |
|92397 |2 |4 |2021-03-07|1 |3 |
|92397 |2 |5 |2021-04-07|1 |4 |
|92397 |2 |6 |2021-05-07|1 |5 |
|92397 |3 |1 |2021-01-04|1 |0 |
|92397 |3 |2 |2021-02-01|2 |0 |
+-------+--------+---------------+----------+-----+---------+
output_df = replace_df.drop('group', 'add_month')
output_df.show(truncate=False)
+-------+--------+---------------+----------+
|account|contract|contract_parcel|date |
+-------+--------+---------------+----------+
|92397 |1 |1 |2020-12-07|
|92397 |1 |2 |2021-01-07|
|92397 |2 |1 |2020-12-07|
|92397 |2 |2 |2021-01-07|
|92397 |2 |3 |2021-02-07|
|92397 |2 |4 |2021-03-07|
|92397 |2 |5 |2021-04-07|
|92397 |2 |6 |2021-05-07|
|92397 |3 |1 |2021-01-04|
|92397 |3 |2 |2021-02-01|
+-------+--------+---------------+----------+
推荐阅读
- sapui5 - 如何在智能过滤栏中设置输入框高度?
- node.js - 带有上传文件的 Nodejs 服务的 HTTP POST 请求在 FF 上 8-15 秒和 25 秒 Chrome 后出现 ERR
- r - 如何在 oec 库中为 getdata 函数编写嵌套 for 循环?
- php - laravel Homestead 以错误开头且没有输入文件
- javascript - 如何用vue实现echarts图表中图形元素的拖动
- amazon-dynamodb - 有没有办法通过在主键上应用过滤条件来从 dynamodb 表中检索项目
- css - 在滚动时更改 CSS 并影响可访问性
- python - 用户输出分析器将以元音形式打印的程序
- yii2 - Yii2 在选择 > 选项列表中添加额外的属性
- python - 使用调色板进行散点图时显示正确的图例