首页 > 解决方案 > 当窗口/分区使用前向填充时,将条件添加到 pyspark sql 中的 last() 函数

问题描述

我得到的代码来自这个链接: https ://johnpaton.net/posts/forward-fill-spark/ 它有一些我想要完成的任务的背景。

from pyspark.sql import Window
from pyspark.sql.functions import last

# define the window
window = Window.partitionBy('location')\
               .orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column = last(spark_df['temperature'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df.withColumn('temp_filled_spark', filled_column)

基本上,last()函数用于查找最后一个非空值的状态。如果所有值都为 null,则返回 null。

但是,如果该组中的所有列都为空,我想分配一个默认值。我尝试了不同的方法,但无法弄清楚。

所以基本上,如果某个位置的温度都为空,我想有一种方法来设置它的默认值。

Some examples:
I want to fill them with default values for the case below:

location  temp                temp
1         null                0
1         null      =====>    0
1         null                0

I do not want to fill them with default values for the case below:

location  temp                 temp
1         null                 null
1          50      ======>      50
1          60                   60

标签: apache-sparkpysparkapache-spark-sql

解决方案


如果给定位置的任何记录包含非空值,也许您可​​以定义另一列作为指示符。例如:

window_2 = Window.partitionBy('location').rowsBetween(-sys.maxsize, sys.maxsize)
max_column = max(spark_df['temperature']).over(window_2)

然后,将该列与您的列一起filled_column有条件地填写最终结果:

temp_filled_spark = when(max_column.isNull(),0).otherwise(filled_column)
spark_df_filled = spark_df.withColumn('temp_filled_spark', temp_filled_spark)

可能不是很优雅或超级性能,但应该工作。


推荐阅读