首页 > 解决方案 > 在 groupBy scala spark 之后保留最近的行

问题描述

我有一个这样的数据框:

+--------+----------+-------------------+------+---------------+
|      ID|      DATE|               TIME|  COL1|           COL2|
+--------+----------+-------------------+------+---------------+
|21763789|2019-08-29|2019-08-29 17:08:06|  1   |            USA|
|29211238|2019-08-27|2019-08-27 11:04:42|  1   |          SPAIN|
| 1696884|2019-08-10|2019-08-10 21:07:57|  1   |         ITALIA|
|  211801|2019-08-06|2019-08-06 20:42:25|  1   |          SPAIN|
|20183201|2019-08-07|2019-08-07 16:59:09|  5001|          SPAIN|
|21763789|2019-08-27|2019-08-27 10:14:38|  1   |            USA|
|29211238|2019-08-14|2019-08-14 09:39:09|  1   |         ITALIA|
|20183201|2019-08-19|2019-08-19 21:30:29|  5001|            USA|
|29211238|2019-08-23|2019-08-23 19:00:25|  1   |            USA|
|  211801|2019-08-22|2019-08-22 05:22:28|  1   |            USA|
|  211801|2019-08-28|2019-08-28 11:58:33|  1   |         ITALIA|
|25648097|2019-08-30|2019-08-30 15:10:22|  2   |          SPAIN|
|29211238|2019-08-27|2019-08-27 11:04:44|  1   |          SPAIN|
|26295227|2019-08-25|2019-08-25 00:08:22|  1   |            USA|
|21763789|2019-08-20|2019-08-20 13:04:34|  1   |          SPAIN|
| 1696884|2019-08-23|2019-08-23 09:27:50|  1   |         ITALIA| 
| 6209818|2019-08-03|2019-08-03 14:52:25|  1   |         ITALIA|
|26295227|2019-08-21|2019-08-21 12:46:58|  1   |            USA|
|29211238|2019-08-22|2019-08-22 17:46:42|  1   |            USA|
|21763789|2019-08-07|2019-08-07 13:02:18|  1   |          SPAIN|
+--------+----------+-------------------+------+---------------+

我想按 ID 和 DATE 对这个数据框进行分组,然后我想只保留 TIME 列中的最新行:

df.groupBy(col("ID"), col("DATE")).agg(min(col("TIME"))) 也许可以,但是我还有很多其他列,那么我的聚合可能会破坏它们?

val onlyRecent = Window.partitionBy(col("ID"), col("DATE")).orderBy(col("TIME")) 我不知道这是否有用。

请问您有什么想法吗?谢谢

标签: scalaapache-spark

解决方案


您在正确的轨道上使用窗口功能。基本上,您希望以某种方式“标记”要保留的记录,然后按该标签进行过滤。您使用哪个标签函数完全取决于您要对TIME列中的重复项执行什么操作。以下将选择“绑定”记录之一(有效地随机)。

val w = Window.partitionBy($"ID", $"DATE").orderBy($"TIME".desc)

df.withColumn("rank", row_number().over(w)).where($"rank" === 1).drop("rank")

如果您想在出现“平局”时保留两条记录,请使用rank()dense_rank()代替row_number().


推荐阅读