首页 > 解决方案 > 如何使用火花窗口功能作为上一行到下一行的级联更改

问题描述

我尝试使用窗口函数以动态方式根据先前值计算当前值

    rowID | value
------------------
     1    | 5
     2    | 7
     3    | 6

逻辑:

If value > pre_value then value

所以在第 2 行,因为 7 > 5 然后value变成 5。最终结果应该是

    rowID | value
------------------
     1    | 5
     2    | 5
     3    | 5

然而使用lag().over(w)给出的结果为

    rowID | value
------------------
     1    | 5
     2    | 5
     3    | 6

它将第三行值 6 与“7”而不是新值“5”进行比较

任何建议如何实现这一目标?

标签: apache-sparkpyspark

解决方案


df.show()
#exampledataframe
+-----+-----+
|rowID|value|
+-----+-----+
|    1|    5|
|    2|    7|
|    3|    6|
|    4|    9|
|    5|    4|
|    6|    3|
+-----+-----+

您所需的逻辑对于窗口函数来说过于动态,因此,我们必须逐行更新我们的值。一种解决方案可能是udf收集的列表上使用普通 python ,然后在应用udf爆炸。如果数据相对较少,这应该没问题。(spark2.4 只是因为arrays_zip)。

from pyspark.sql import functions as F
from pyspark.sql.types import *
def add_one(a):
    for i in range(1,len(a)):
       if a[i]>a[i-1]:
           a[i]=a[i-1]
    return a
udf1= F.udf(add_one, ArrayType(IntegerType()))
df.agg(F.collect_list("rowID").alias("rowID"),F.collect_list("value").alias("value"))\
  .withColumn("value", udf1("value"))\
  .withColumn("zipped", F.explode(F.arrays_zip("rowID","value"))).select("zipped.*").show()

+-----+-----+
|rowID|value|
+-----+-----+
|    1|    5|
|    2|    5|
|    3|    5|
|    4|    5|
|    5|    4|
|    6|    3|
+-----+-----+

UPDATE:

更好的是,由于您有 5000 个组,因此使用 aPandas vectorized udf( grouped MAP)应该对处理有很大帮助。而且您不必收集 5000 个整数的 collect_list并爆炸或使用pivot。我认为这应该是最佳解决方案。Pandas UDAF available for spark2.3+

下面的GroupBy 是空的,但您可以在其中添加分组列。

from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
   for i in range(1, len(df1)):
        if df1.loc[i, 'value']>df1.loc[i-1,'value']:
           df1.loc[i,'value']=df1.loc[i-1,'value']

   return df1
df.groupby().apply(grouped_map).show()

+-----+-----+
|rowID|value|
+-----+-----+
|    1|    5|
|    2|    5|
|    3|    5|
|    4|    5|
|    5|    4|
|    6|    3|
+-----+-----+ 

推荐阅读