python - 复合需要从前一行计算并插入到 Pyspark 中的当前行
问题描述
有一个数据集,我在 Loan 和 Row 级别对其进行了分组,并为 Row =1 填充了一个基值。需要计算复合值并创建一个新列“Compounding_new_value”,其中 Condition :
- 如果 Base_val 不为 0,则将 Base_val 原样插入 Compounding_new_value,
- 下一行,当base_val为0时,第一次需要用公式(上一行的Base_value +(上一行的Base_val *当前行的利息值)计算复利值,然后将该值插入“Compounding_new_value”列
- 下一行,当 base_val 为 0 时,它将使用公式计算复利值 (Compounding_new_value from Previous row + (Compounding_new_value from Previous row * Interest value from Current Row) 并将该值插入到 Compounding_new_value 中,该值将成为下一行的输入,直到达到 Base_val为非零并从步骤 1 开始处理。
问题:它仅更新第一行的 Compounding_new_value,但不更新剩余行。
示例代码:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
simpleData = (
(1, "123", 2, 3000),
(2, "123", 2, 0),
(3, "123", 2, 0),
(4, "123", 2, 0),
(5, "123", 2, 0),
(6, "123", 2, 0),
(1, "234", 3, 2000),
(2, "234", 3, 0),
(3, "234", 3, 0),
(4, "234", 3, 0),
)
columns = ["Row", "Loan", "Interest", "Base_val"]
df = spark.createDataFrame(data=simpleData, schema=columns)
# Adding a column Compound_new_value having default 0 value
df = df.withColumn("Compound_new_value", F.lit(0))
WindowSpec = Window.partitionBy("loan").orderBy(F.col("loan").asc(), F.col("Row").asc())
df = df.withColumn(
"Compound_new_value",
F.when(
F.col("Base_val") == 0,
F.lag(
F.col("Compound_new_value", 1).over(WindowSpec)
+ (
F.lag(F.col("Compound_new_value"), 1).over(WindowSpec)
* F.col("Interest")
)
).otherwise(F.col("Base_val")),
),
)
df.show(10, False)
理想情况下,应为所有行更新 Compounding_new_value,但仅在第一行发生。当我重新执行以下数据框时,只会更新下一行的 Compounding_new_value。基本上,它需要一次更新所有行。
df = df.withColumn(
"Compound_new_value",
F.when(
F.col("Base_val") == 0,
F.lag(
F.col("Compound_new_value", 1).over(WindowSpec)
+ (
F.lag(F.col("Compound_new_value"), 1).over(WindowSpec)
* F.col("Interest")
)
).otherwise(F.col("Base_val")),
),
)
df.show(10, False)
重新执行数据帧后,它会更新第二行。
请帮忙,我做错了什么。
解决方案
interest
如果贷款不变,则此解决方案有效。
你需要做一些高中数学来解决这个问题:
Vi是第i层的 Compound_value 。
我是兴趣。
那么,Vi+1 = Vi + I*Vi = Vi * (1+I)
那么,Vi+1 = V0 * (1+I)^(i-1)
您只需要在数据框中应用此公式:
from pyspark.sql import functions as F, Window as W
df.withColumn(
"compound_value",
F.first("Base_val").over(W.partitionBy("Loan").orderBy("Row"))
* F.pow(1 + F.col("Interest"), F.col("row") - 1),
).show()
+---+----+--------+--------+--------------+
|Row|Loan|Interest|Base_val|compound_value|
+---+----+--------+--------+--------------+
| 1| 234| 3| 2000| 2000.0|
| 2| 234| 3| 0| 8000.0|
| 3| 234| 3| 0| 32000.0|
| 4| 234| 3| 0| 128000.0|
| 1| 123| 2| 3000| 3000.0|
| 2| 123| 2| 0| 9000.0|
| 3| 123| 2| 0| 27000.0|
| 4| 123| 2| 0| 81000.0|
| 5| 123| 2| 0| 243000.0|
| 6| 123| 2| 0| 729000.0|
+---+----+--------+--------+--------------+
推荐阅读
- python - 多个 .kv 文件处理 2 个 .kv 文件
- python - 将 SqlAlchemy 与 RabbitMQ 一起使用
- java - 非空字符串包装器
- php - 字数和阅读时间错误我无法在 Wordpress 中追踪和修复
- angular - 在从后台加载数据之前,如何从 dom 中隐藏角度材质 FormGroup?
- powershell - 在PowerShell中远程运行stopservice命令时如何重定向或隐藏系统参数的输出
- arrays - 关于solidity数组的问题
- java - 使用Java在一个范围内生成多个随机双数?
- c - realloc 破坏源内存
- reactjs - 在 https://name.github.io 上使用 reCAPTCHA