首页 > 解决方案 > 复合需要从前一行计算并插入到 Pyspark 中的当前行

问题描述

有一个数据集,我在 Loan 和 Row 级别对其进行了分组,并为 Row =1 填充了一个基值。需要计算复合值并创建一个新列“Compounding_new_value”,其中 Condition :

  1. 如果 Base_val 不为 0,则将 Base_val 原样插入 Compounding_new_value,
  2. 下一行,当base_val为0时,第一次需要用公式(上一行的Base_value +(上一行的Base_val *当前行的利息值)计算复利值,然后将该值插入“Compounding_new_value”列
  3. 下一行,当 base_val 为 0 时,它将使用公式计算复利值 (Compounding_new_value from Previous row + (Compounding_new_value from Previous row * Interest value from Current Row) 并将该值插入到 Compounding_new_value 中,该值将成为下一行的输入,直到达到 Base_val为非零并从步骤 1 开始处理。

问题:它仅更新第一行的 Compounding_new_value,但不更新剩余行。

示例代码:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

simpleData = (
    (1, "123", 2, 3000),
    (2, "123", 2, 0),
    (3, "123", 2, 0),
    (4, "123", 2, 0),
    (5, "123", 2, 0),
    (6, "123", 2, 0),
    (1, "234", 3, 2000),
    (2, "234", 3, 0),
    (3, "234", 3, 0),
    (4, "234", 3, 0),
)

columns = ["Row", "Loan", "Interest", "Base_val"]
df = spark.createDataFrame(data=simpleData, schema=columns)
# Adding a column Compound_new_value having default 0 value
df = df.withColumn("Compound_new_value", F.lit(0))

数据的样子

WindowSpec = Window.partitionBy("loan").orderBy(F.col("loan").asc(), F.col("Row").asc())
df = df.withColumn(
    "Compound_new_value",
    F.when(
        F.col("Base_val") == 0,
        F.lag(
            F.col("Compound_new_value", 1).over(WindowSpec)
            + (
                F.lag(F.col("Compound_new_value"), 1).over(WindowSpec)
                * F.col("Interest")
            )
        ).otherwise(F.col("Base_val")),
    ),
)

df.show(10, False)

执行数据帧后

理想情况下,应为所有行更新 Compounding_new_value,但仅在第一行发生。当我重新执行以下数据框时,只会更新下一行的 Compounding_new_value。基本上,它需要一次更新所有行。

df = df.withColumn(
    "Compound_new_value",
    F.when(
        F.col("Base_val") == 0,
        F.lag(
            F.col("Compound_new_value", 1).over(WindowSpec)
            + (
                F.lag(F.col("Compound_new_value"), 1).over(WindowSpec)
                * F.col("Interest")
            )
        ).otherwise(F.col("Base_val")),
    ),
)

df.show(10, False)

重新执行数据帧后,它会更新第二行

重新执行数据帧后,它会更新第二行。

请帮忙,我做错了什么。

标签: pythondataframepyspark

解决方案


interest如果贷款不变,则此解决方案有效。

你需要做一些高中数学来解决这个问题:

Vi是第i层的 Compound_value 。
是兴趣。
那么,Vi+1 = Vi + I*Vi = Vi * (1+I)
那么,Vi+1 = V0 * (1+I)^(i-1)

您只需要在数据框中应用此公式:

from pyspark.sql import functions as F, Window as W

df.withColumn(
    "compound_value",
    F.first("Base_val").over(W.partitionBy("Loan").orderBy("Row"))
    * F.pow(1 + F.col("Interest"), F.col("row") - 1),
).show()

+---+----+--------+--------+--------------+                                     
|Row|Loan|Interest|Base_val|compound_value|
+---+----+--------+--------+--------------+
|  1| 234|       3|    2000|        2000.0|
|  2| 234|       3|       0|        8000.0|
|  3| 234|       3|       0|       32000.0|
|  4| 234|       3|       0|      128000.0|
|  1| 123|       2|    3000|        3000.0|
|  2| 123|       2|       0|        9000.0|
|  3| 123|       2|       0|       27000.0|
|  4| 123|       2|       0|       81000.0|
|  5| 123|       2|       0|      243000.0|
|  6| 123|       2|       0|      729000.0|
+---+----+--------+--------+--------------+

推荐阅读