首页 > 解决方案 > 如何在pyspark的一列上应用窗口函数?

问题描述

我有一个下面的数据框,它捕获每个管道运行的记录数:

在此处输入图像描述

对于相同的表名,我想覆盖现有记录并在该运行中保留最新的记录,例如当我在 7 月 26 日运行管道时,添加了 2 个新记录 def 和 lmn,因为 def 已经存在我想添加 666 def 记录本身,示例如下:

在此处输入图像描述

如何做到这一点?我使用了窗口功能,但这并没有解决问题。

window = Window.partitionBy("tbl_name").orderBy(F.col("updated_on").desc())
a = a.withColumn('2019_07_26', F.first('2019_07_26').over(window))

标签: pythondataframepyspark

解决方案


您可以使用dense_rank来归档它,请参见下面的示例:

from datetime import datetime
from pyspark.sql.window import *
import pyspark.sql.functions as F  
data = [
  ("def",None,20, datetime(2017, 3, 12, 3, 19, 58)),
  ("ab",None, 20, datetime(2017, 3, 12, 3, 21, 30)),
  ("test",20, None, datetime(2017, 3, 13, 3, 29, 40)),
  ("def",20, None, datetime(2017, 3, 13, 3, 31, 23))
]
df = sqlContext.createDataFrame(data, ["tbl_name","2019","2020","updated_on"])
df.show()
+--------+----+----+-------------------+
|tbl_name|2019|2020|         updated_on|
+--------+----+----+-------------------+
|     def|null|null|2017-03-12 03:19:58|
|      ab|null|  20|2017-03-12 03:21:30|
|    test|  20|null|2017-03-13 03:29:40|
|     def|  20|null|2017-03-13 03:31:23|
+--------+----+----+-------------------+

然后应用密集秩:

wd = Window.partitionBy("tbl_name").orderBy(F.col("updated_on").asc())
wa = Window.partitionBy("tbl_name").orderBy(F.col("updated_on").desc())
df2 = df.select("tbl_name",
                F.first("2019", ignorenulls=True).over(wa).alias("2019"),
                F.first("2020", ignorenulls=True).over(wa).alias("2020"),
                "updated_on",

F.dense_rank().over(wd).alias("rank")).filter(F.col("rank")==1).drop("rank")

导致:

+--------+----+----+-------------------+
|tbl_name|2019|2020|         updated_on|
+--------+----+----+-------------------+
|      ab|null|  20|2017-03-12 03:21:30|
|    test|  20|null|2017-03-13 03:29:40|
|     def|  20|  20|2017-03-12 03:19:58|
+--------+----+----+-------------------+

推荐阅读