pyspark - Pyspark:根据列和排名的总和删除/过滤行
问题描述
我有一个这样的数据框:
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11", "2020-05-11"],
"Slot_Length": [30, 30, 30, 30, 30, 30, 30, 30, 30],
"Total_Space": [60, 60, 60, 120, 120, 120, 120, 120, 120],
"Amount_Over": [-30, -30, -30, -60, -60, -60, -60, -60, -60],
"Rank": [1, 1, 2, 1, 1, 1, 1, 2, 2]})
df = spark.createDataFrame(df)
+----------+-----------+-----------+-----------+----+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|
+----------+-----------+-----------+-----------+----+
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 2|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 2|
|2020-05-11| 30| 120| -60| 2|
+----------+-----------+-----------+-----------+----+
对于每一个Date
我都有一个Total_Space
可以填满的。所以对于2020-05-10
,我有 60 秒,对于2020-05-11
我有 120 秒。
每个Date
也已经分配了具有一定Slot_Length
.
对于每个Date
,我已经计算了列中的空间量,Date
并Amount_Over
根据此处未显示的优先级列对它们进行了适当的排名。
我想做的是删除 a 最低的行,Rank
直到Date
sSlot_Length
加起来Total_Space
for a Date
。
+----------+-----------+-----------+-----------+----+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|
+----------+-----------+-----------+-----------+----+
|2020-05-10| 30| 60| -30| 1|
|2020-05-10| 30| 60| -30| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
|2020-05-11| 30| 120| -60| 1|
+----------+-----------+-----------+-----------+----+
在这个例子中,它就像把所有Rank
等于 2 一样简单,但是会有一些排名之间存在平局的例子,所以首先取最高的排名,如果有平局,则随机取一个。
做这个的最好方式是什么?我已经明白它需要一个 Window 函数在 Date 上正确地对 、 和 列进行Slot_Length
每次Total_Space
计算Amount_Over
。
解决方案
df = pd.DataFrame({"Date": ["2020-05-10", "2020-05-10", "2020-05-10", "2020-05-11", "2020-05-11", "2020-05-11",
"2020-05-11", "2020-05-11", "2020-05-11"],
"Slot_Length": [30, 30, 30, 30, 30, 30, 30, 30, 30],
"Total_Space": [60, 60, 60, 120, 120, 120, 120, 120, 120],
"Amount_Over": [-30, -30, -30, -60, -60, -60, -60, -60, -60],
"Rank": [1, 1, 2, 1, 1, 1, 1, 2, 2]})
df = spark.createDataFrame(df)
w = Window.partitionBy("Date").orderBy("Rank").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn(
"Cumulative_Sum", F.sum("Slot_Length").over(w)
).filter(
F.col("Cumulative_Sum") <= F.col("Total_Space")
).orderBy("Date","Rank","Cumulative_Sum").show()
结果
+----------+-----------+-----------+-----------+----+--------------+
| Date|Slot_Length|Total_Space|Amount_Over|Rank|Cumulative_Sum|
+----------+-----------+-----------+-----------+----+--------------+
|2020-05-10| 30| 60| -30| 1| 30|
|2020-05-10| 30| 60| -30| 1| 60|
|2020-05-11| 30| 120| -60| 1| 30|
|2020-05-11| 30| 120| -60| 1| 60|
|2020-05-11| 30| 120| -60| 1| 90|
|2020-05-11| 30| 120| -60| 1| 120|
+----------+-----------+-----------+-----------+----+--------------+
推荐阅读
- python-3.x - 无法列出 Azure 容器中的 Blob
- sql-server - SQL Server 使用大小写动态添加条件
- c - 如何读取 txt 文件并将信息放入 C 中的 struct 中?
- c# - 带有 .NetCore 依赖注入的 Log4Net 无法输出方法名称
- git - 结帐时的问题
- gradle - 在将闭包传递给 gradle 扩展时,为什么闭包的所有者不是主要的 Projects 对象?
- javascript - 如何使用存储在 db 中的用户电子邮件地址将消息发送给特定用户?
- sql - 查询返回机器运行时间
- dbeaver - 海狸 7.1.5 。转储数据库不起作用
- html - 不知道怎么做这个菜单