apache-spark - 如何在 spark 数据框中获取以下输入数据集的最大关闭日期和状态?
问题描述
我有一个下面的 spark 数据框,我需要检查作业是否已关闭。每个作业都可以有子作业,并且一旦所有子作业都关闭,则该作业被视为已关闭。请您建议在 pyspark 中实现这一目标的方法。
例如:输入 df
JobNum CloseDt ClosedFlg
12 N
12-01 2012-01-01 Y
12-02 2012-02-01 Y
13 2013-01-01 Y
14
14-01 2015-01-02 Y
14-02 N
输出_df:
JobNum IsClosedFlg Max_ClosedDt
12 Y 2012-02-01
13 Y 2013-01-01
14 N
解决方案
您可以分配一个按jobnum分区并按子jobnum降序排列的行号,并过滤行号= 1的行。
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'rn',
F.row_number().over(
Window.partitionBy(F.split('JobNum', '-')[0])
.orderBy(F.split('JobNum', '-')[1].desc())
)
).filter('rn = 1').select(
F.split('JobNum', '-')[0].alias('JobNum'),
F.col('ClosedFlg').alias('IsClosedFlg'),
F.col('CloseDt').alias('Max_ClosedDt')
)
df2.show()
+------+-----------+------------+
|JobNum|IsClosedFlg|Max_ClosedDt|
+------+-----------+------------+
| 12| Y| 2012-02-01|
| 13| Y| 2013-01-01|
| 14| N| null|
+------+-----------+------------+
推荐阅读
- arrays - 如何获取函数结果并将其转换为键值对数组
- gitlab - 由于未经授权的错误,无法使用 dotnet CLI 将 nuGet 包推送到 GitLab
- python - Gurobi 的问题 - 添加带有回调函数的用户剪辑
- linux - 如何根据其他 2 列的模式添加额外的列
- laravel - 如何让来自不同项目的 docker 容器相互通信
- javascript - chrome - windows 和 safari - mac 的输出完全不同
- arrays - 将匹配的列标题放在一列中并转置数据
- php - Laravel ajax 自动完成非常慢
- javascript - 将 JSX 附加到 ReactJS 中的状态变量
- python - Keras Fit_generator 未报告进度或在验证步骤中或未启用详细验证以进行验证