python - 如何在pyspark的一列上应用窗口函数?
问题描述
我有一个下面的数据框,它捕获每个管道运行的记录数:
对于相同的表名,我想覆盖现有记录并在该运行中保留最新的记录,例如当我在 7 月 26 日运行管道时,添加了 2 个新记录 def 和 lmn,因为 def 已经存在我想添加 666 def 记录本身,示例如下:
如何做到这一点?我使用了窗口功能,但这并没有解决问题。
window = Window.partitionBy("tbl_name").orderBy(F.col("updated_on").desc())
a = a.withColumn('2019_07_26', F.first('2019_07_26').over(window))
解决方案
您可以使用dense_rank来归档它,请参见下面的示例:
from datetime import datetime
from pyspark.sql.window import *
import pyspark.sql.functions as F
data = [
("def",None,20, datetime(2017, 3, 12, 3, 19, 58)),
("ab",None, 20, datetime(2017, 3, 12, 3, 21, 30)),
("test",20, None, datetime(2017, 3, 13, 3, 29, 40)),
("def",20, None, datetime(2017, 3, 13, 3, 31, 23))
]
df = sqlContext.createDataFrame(data, ["tbl_name","2019","2020","updated_on"])
df.show()
+--------+----+----+-------------------+
|tbl_name|2019|2020| updated_on|
+--------+----+----+-------------------+
| def|null|null|2017-03-12 03:19:58|
| ab|null| 20|2017-03-12 03:21:30|
| test| 20|null|2017-03-13 03:29:40|
| def| 20|null|2017-03-13 03:31:23|
+--------+----+----+-------------------+
然后应用密集秩:
wd = Window.partitionBy("tbl_name").orderBy(F.col("updated_on").asc())
wa = Window.partitionBy("tbl_name").orderBy(F.col("updated_on").desc())
df2 = df.select("tbl_name",
F.first("2019", ignorenulls=True).over(wa).alias("2019"),
F.first("2020", ignorenulls=True).over(wa).alias("2020"),
"updated_on",
F.dense_rank().over(wd).alias("rank")).filter(F.col("rank")==1).drop("rank")
导致:
+--------+----+----+-------------------+
|tbl_name|2019|2020| updated_on|
+--------+----+----+-------------------+
| ab|null| 20|2017-03-12 03:21:30|
| test| 20|null|2017-03-13 03:29:40|
| def| 20| 20|2017-03-12 03:19:58|
+--------+----+----+-------------------+
推荐阅读
- firebase-realtime-database - 如果我升级我的 Firebase 计划,情况是否解决了?
- c++ - 如何逐步更新 QMainWindow?
- azure - 如何在 Web App 上配置 IP 限制以仅允许通过应用程序网关的流量?
- python - AttributeError: '_io.TextIOWrapper' 对象没有属性 'append'
- javascript - 将对象数组减少到 x 个项目。试图保留至少 2 个特定对象属性值
- c# - 按升序排列列表框
- node.js - 如何使用 AdonisJS 验证号码
- excel - Excel VBA 中不存在选项卡时的错误处理
- javascript - 使用 Nightmare.js 根据类名选择和单击元素
- css - 使用 Koala 编译 .scss 时出错