apache-spark - 如何使用火花窗口功能作为上一行到下一行的级联更改
问题描述
我尝试使用窗口函数以动态方式根据先前值计算当前值
rowID | value
------------------
1 | 5
2 | 7
3 | 6
逻辑:
If value > pre_value then value
所以在第 2 行,因为 7 > 5 然后value
变成 5。最终结果应该是
rowID | value
------------------
1 | 5
2 | 5
3 | 5
然而使用lag().over(w)
给出的结果为
rowID | value
------------------
1 | 5
2 | 5
3 | 6
它将第三行值 6 与“7”而不是新值“5”进行比较
任何建议如何实现这一目标?
解决方案
df.show()
#exampledataframe
+-----+-----+
|rowID|value|
+-----+-----+
| 1| 5|
| 2| 7|
| 3| 6|
| 4| 9|
| 5| 4|
| 6| 3|
+-----+-----+
您所需的逻辑对于窗口函数来说过于动态,因此,我们必须逐行更新我们的值。一种解决方案可能是udf
在收集的列表上使用普通 python ,然后在应用后udf
爆炸。如果数据相对较少,这应该没问题。(spark2.4 只是因为arrays_zip
)。
from pyspark.sql import functions as F
from pyspark.sql.types import *
def add_one(a):
for i in range(1,len(a)):
if a[i]>a[i-1]:
a[i]=a[i-1]
return a
udf1= F.udf(add_one, ArrayType(IntegerType()))
df.agg(F.collect_list("rowID").alias("rowID"),F.collect_list("value").alias("value"))\
.withColumn("value", udf1("value"))\
.withColumn("zipped", F.explode(F.arrays_zip("rowID","value"))).select("zipped.*").show()
+-----+-----+
|rowID|value|
+-----+-----+
| 1| 5|
| 2| 5|
| 3| 5|
| 4| 5|
| 5| 4|
| 6| 3|
+-----+-----+
UPDATE:
更好的是,由于您有 5000 个组,因此使用 aPandas vectorized udf( grouped MAP)
应该对处理有很大帮助。而且您不必收集 5000 个整数的 collect_list并爆炸或使用pivot。我认为这应该是最佳解决方案。Pandas UDAF available for spark2.3+
下面的GroupBy 是空的,但您可以在其中添加分组列。
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
for i in range(1, len(df1)):
if df1.loc[i, 'value']>df1.loc[i-1,'value']:
df1.loc[i,'value']=df1.loc[i-1,'value']
return df1
df.groupby().apply(grouped_map).show()
+-----+-----+
|rowID|value|
+-----+-----+
| 1| 5|
| 2| 5|
| 3| 5|
| 4| 5|
| 5| 4|
| 6| 3|
+-----+-----+
推荐阅读
- angular - 将元素传递给 Angular 中的函数
- ruby-on-rails - Rails 控制器方法没有被调用
- python-3.x - Python迭代器在for-in循环内修改其列表时出现意外行为
- javascript - 如何在 PHP 中调试“遇到的非数字值”?
- ios - 用键盘向上移动 UITextField - swift 4.2 的变化?
- javascript - javascript中的反引号语法函数调用
- c# - 如何停止异步任务?
- java - 如何使用 jpa 执行本机 memsql 查询
- jupyter-notebook - Graphviz:如何在同一个图中包含多个图?
- python-3.x - 查找两个数组的每个元素的最小差异索引的最有效方法