首页 > 解决方案 > Spark - 在动态条件下从窗口中删除行

问题描述

这是我的数据框:

+------+------+-----------+---------+
| key1 | key2 | timestamp | status  |
+------+------+-----------+---------+
| AAA  | 111  | 1000      | event_a |
+------+------+-----------+---------+
| AAA  | 111  | 1100      | null    |
+------+------+-----------+---------+
| AAA  | 111  | 1200      | event_b |
+------+------+-----------+---------+
| AAA  | 111  | 1300      | null    |
+------+------+-----------+---------+
| AAA  | 222  | 1200      | event_a |
+------+------+-----------+---------+
| AAA  | 222  | 1300      | event_b |
+------+------+-----------+---------+
| AAA  | 222  | 1400      | null    |
+------+------+-----------+---------+
| AAA  | 222  | 1500      | null    |
+------+------+-----------+---------+

我想通过key1key2(可能使用窗口函数?)对行进行分组,然后从每个组中删除早于event_b- 基于时间戳的行。

所以结果是:

+------+------+-----------+---------+
| key1 | key2 | timestamp | status  |
+------+------+-----------+---------+
| AAA  | 111  | 1000      | event_a |
+------+------+-----------+---------+
| AAA  | 111  | 1100      | null    |
+------+------+-----------+---------+
| AAA  | 111  | 1200      | event_b |
+------+------+-----------+---------+
| AAA  | 222  | 1200      | event_a |
+------+------+-----------+---------+
| AAA  | 222  | 1300      | event_b |
+------+------+-----------+---------+

event_a和之间可能有更多的状态event_b,但总是event_b最后一个,后面有空值。

我知道从数据框中删除元素是不可能的。我的意思是创建新的数据框。

标签: scalaapache-spark

解决方案


我认为您不需要窗口函数,即使您使用窗口函数,您也会对数据进行两次完整传递。一种方法是使用“event_b”行创建一个过滤的dataFrame,然后对其进行比较并根据时间戳保留所需的行。

val testDF = Seq(
 ("AAA", 111, 1000, "event_a"),
 ("AAA", 111, 1100, null),
 ("AAA", 111, 1200, "event_b"),
 ("AAA", 111, 1300, null),
 ("AAA", 222, 1200, "event_a"),
 ("AAA", 222, 1300, "event_b"),
 ("AAA", 222, 1400, null),
 ("AAA", 222, 1500, null)                           
).toDF("key1", "key2", "timestamp", "status")


val bDF = testDF.filter("status = 'event_b'")
  .withColumnRenamed("timestamp", "bTimestamp")
  .drop("status")

val joinedF = testDF.join(bDF, Seq("key1", "key2"))
  .withColumn("dropCol", col("bTimestamp") >= col("timestamp"))
  .filter("dropCol")
  .drop("bTimestamp")
  .drop("dropCol")

推荐阅读