首页 > 解决方案 > 如何从 Pyspark / Python 数据集中先前计算的列中获取值

问题描述

我正在尝试在 Pyspark / Python 表中创建一个新列(B)。新列(B)是:列(A)的当前值+列(B)的先前值的总和

所需的输出示例图像

`Id   a     b
1    977   977
2    3665  4642
3    1746  6388
4    2843  9231
5    200   9431`

当前列 B = 当前列 A + 先前列 B ;例如第 4 行:9231(B 列)= 2843(A 列)+ 6388(以前的 B 列值)

(对于第一行,因为 B 没有先前的值,所以它是 0)

请帮助我使用 Python / PySpark 查询代码

标签: pythonmysqlpysparkhiveapache-spark-sql

解决方案


如果没有上下文,我可能是错的,但似乎你试图做 A 列的累积总和:

from pyspark.sql.window import Window
import pyspark.sql.functions as sf

df = df.withColumn('B', sf.sum(df.A).over(Window.partitionBy().orderBy().rowsBetween(
Window.unboundedPreceding, 0)))

编辑:

如果您需要根据 B 的最后一个值迭代地添加新行,并假设数据框中 B 的值在此期间没有变化,我认为您最好将 B 记住在标准 python 变量中并构建以下行接着就,随即。

previous_B = 0
# your code to get new A
previous_B += new_A
new_row = spark.createDataFrame([(new_A, previous_B)])
df = df.union(new_row)

推荐阅读