首页 > 解决方案 > 在 Spark 中基于窗口和条件创建新列

问题描述

初始数据帧:

+------------------------------+----------+-------+
|          Timestamp           | Property | Value |
+------------------------------+----------+-------+
| 2019-09-01T01:36:57.000+0000 | X        |     N |
| 2019-09-01T01:37:39.000+0000 | A        |     3 |
| 2019-09-01T01:42:55.000+0000 | X        |     Y |
| 2019-09-01T01:53:44.000+0000 | A        |    17 |
| 2019-09-01T01:55:34.000+0000 | A        |     9 |
| 2019-09-01T01:57:32.000+0000 | X        |     N |
| 2019-09-01T02:59:40.000+0000 | A        |     2 |
| 2019-09-01T02:00:03.000+0000 | A        |    16 |
| 2019-09-01T02:01:40.000+0000 | X        |     Y |
| 2019-09-01T02:04:03.000+0000 | A        |    21 |
+------------------------------+----------+-------+

最终数据框架:

+------------------------------+----------+-------+---+
|          Timestamp           | Property | Value | X |
+------------------------------+----------+-------+---+
| 2019-09-01T01:37:39.000+0000 | A        |     3 | N |
| 2019-09-01T01:53:44.000+0000 | A        |    17 | Y |
| 2019-09-01T01:55:34.000+0000 | A        |     9 | Y |
| 2019-09-01T02:00:03.000+0000 | A        |    16 | N |
| 2019-09-01T02:04:03.000+0000 | A        |    21 | Y |
| 2019-09-01T02:59:40.000+0000 | A        |     2 | Y |
+------------------------------+----------+-------+---+

基本上,我有一个时间戳、一个属性和一个值字段。该属性可以是AX,并且它具有值。我想要一个新的 DataFrame,其中第四列X基于X属性的值命名。

  1. 我开始浏览从最早到最旧的行。
  2. 我遇到具有 X 属性的行,我存储它的值并将其插入 X 列。
  3. 如果我遇到 A 属性行:我将上一步中存储的值插入 X 列。
  4. ELSE(意味着我遇到一个 X 属性行):我更新存储的值(因为它是更新的)并将新的存储值插入 X 列。
  5. 我一直这样做,直到我完成了整个数据框。
  6. 我删除了带有 X 属性的行,以使最终数据框显示在上面。

我确信有某种方法可以使用 Window 函数有效地做到这一点。

标签: scalaapache-sparkpyspark

解决方案


使用值 X 的值创建一个临时列,如果 A 则为空。然后使用窗口获取最后一个非空临时值。最后过滤属性“A”。

scala> val df = Seq(
     |   ("2019-09-01T01:36:57.000+0000", "X", "N"),
     |   ("2019-09-01T01:37:39.000+0000", "A", "3"),
     |   ("2019-09-01T01:42:55.000+0000", "X", "Y"),
     |   ("2019-09-01T01:53:44.000+0000", "A", "17"),
     |   ("2019-09-01T01:55:34.000+0000", "A", "9"),
     |   ("2019-09-01T01:57:32.000+0000", "X", "N"),
     |   ("2019-09-01T02:59:40.000+0000", "A", "2"),
     |   ("2019-09-01T02:00:03.000+0000", "A", "16"),
     |   ("2019-09-01T02:01:40.000+0000", "X", "Y"),
     |   ("2019-09-01T02:04:03.000+0000", "A", "21")
     | ).toDF("Timestamp", "Property", "Value").withColumn("Temp", when($"Property" === "X", $"Value").otherwise(null))
df: org.apache.spark.sql.DataFrame = [Timestamp: string, Property: string ... 2 more fields]

scala> df.show(false)
+----------------------------+--------+-----+----+
|Timestamp                   |Property|Value|Temp|
+----------------------------+--------+-----+----+
|2019-09-01T01:36:57.000+0000|X       |N    |N   |
|2019-09-01T01:37:39.000+0000|A       |3    |null|
|2019-09-01T01:42:55.000+0000|X       |Y    |Y   |
|2019-09-01T01:53:44.000+0000|A       |17   |null|
|2019-09-01T01:55:34.000+0000|A       |9    |null|
|2019-09-01T01:57:32.000+0000|X       |N    |N   |
|2019-09-01T02:59:40.000+0000|A       |2    |null|
|2019-09-01T02:00:03.000+0000|A       |16   |null|
|2019-09-01T02:01:40.000+0000|X       |Y    |Y   |
|2019-09-01T02:04:03.000+0000|A       |21   |null|
+----------------------------+--------+-----+----+


scala> val overColumns = Window.orderBy("TimeStamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@1b759662

scala> df.withColumn("X", last($"Temp",true).over(overColumns)).show(false)
+----------------------------+--------+-----+----+---+
|Timestamp                   |Property|Value|Temp|X  |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:36:57.000+0000|X       |N    |N   |N  |
|2019-09-01T01:37:39.000+0000|A       |3    |null|N  |
|2019-09-01T01:42:55.000+0000|X       |Y    |Y   |Y  |
|2019-09-01T01:53:44.000+0000|A       |17   |null|Y  |
|2019-09-01T01:55:34.000+0000|A       |9    |null|Y  |
|2019-09-01T01:57:32.000+0000|X       |N    |N   |N  |
|2019-09-01T02:00:03.000+0000|A       |16   |null|N  |
|2019-09-01T02:01:40.000+0000|X       |Y    |Y   |Y  |
|2019-09-01T02:04:03.000+0000|A       |21   |null|Y  |
|2019-09-01T02:59:40.000+0000|A       |2    |null|Y  |
+----------------------------+--------+-----+----+---+

scala> df.withColumn("X", last($"Temp",true).over(overColumns)).filter($"Property" === "A").show(false)

+----------------------------+--------+-----+----+---+
|Timestamp                   |Property|Value|Temp|X  |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:37:39.000+0000|A       |3    |null|N  |
|2019-09-01T01:53:44.000+0000|A       |17   |null|Y  |
|2019-09-01T01:55:34.000+0000|A       |9    |null|Y  |
|2019-09-01T02:00:03.000+0000|A       |16   |null|N  |
|2019-09-01T02:04:03.000+0000|A       |21   |null|Y  |
|2019-09-01T02:59:40.000+0000|A       |2    |null|Y  |
+----------------------------+--------+-----+----+---+



推荐阅读