首页 > 解决方案 > 使用 pyspark 使用附加条件跟踪前一行值

问题描述

我正在使用 pyspark 生成一个数据框,只有当 amt = 0 时,我才需要用前一行的“amt”值更新“amt”列。

例如,下面是我的数据框

+---+-----+
| id|amt  |
+---+-----+
|  1|    5|
|  2|    0|
|  3|    0|
|  4|    6|
|  5|    0|
|  6|    3|
+---+-----+

现在,我想要创建以下 DF。每当 amt = 0 时,modi_amt col 将包含前一行的非零值,否则没有变化。

+---+-----+----------+
| id|amt  |modi_amt  |
+---+-----+----------+
|  1|    5|         5|
|  2|    0|         5|
|  3|    0|         5|
|  4|    6|         6|
|  5|    0|         6|
|  6|    3|         3|
+---+-----+----------+

我能够获得前几行的值,但对于出现多个 0 amt 的行需要帮助(例如,id = 2,3)

我正在使用的代码:

from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("id")
DF= DF.withColumn("prev_amt", F.lag(DF.amt).over(my_window))
DF= DF.withColumn("modi_amt",when(DF.amt== 0,DF.prev_amt).otherwise(DF.amt)).drop('prev_amt')

我得到以下 DF

+---+-----+----------+
| id|amt  |modi_amt  |
+---+-----+----------+
|  1|    5|         5|
|  2|    0|         5|
|  3|    0|         0|
|  4|    6|         6|
|  5|    0|         6|
|  6|    3|         3|
+---+-----+----------+

基本上 id 3 也应该有 modi_amt = 5

标签: python-3.xapache-sparkpysparkapache-spark-sqlpyspark-sql

解决方案


我已经使用下面的方法来获取输出并且它工作正常,

from pyspark.sql.window import Window
my_window = Window.partitionBy().orderBy("id")
# this will hold the previous col value
DF= DF.withColumn("prev_amt", F.lag(DF.amt).over(my_window))

# this will replace the amt 0 with previous column value, but not consecutive rows having 0 amt.  
DF = DF.withColumn("amt_adjusted",when(DF.prev_amt == 0,DF.prev_OffSet).otherwise(DF.amt))

# define null for the rows where both amt and amt_adjusted are having 0 (logic for consecutive rows having 0 amt)
DF = DF.withColumn('zeroNonZero', when((DF.amt== 0)&(DF.amt_adjusted == 0),lit(None)).otherwise(DF.amt_adjusted))

# replace all null values with previous Non zero amt row value
DF= DF.withColumn('modi_amt',last("zeroNonZero", ignorenulls= True).over(Window.orderBy("id").rowsBetween(Window.unboundedPreceding,0)))

还有其他更好的方法吗?


推荐阅读