scala - 在 Spark 中基于窗口和条件创建新列
问题描述
初始数据帧:
+------------------------------+----------+-------+
| Timestamp | Property | Value |
+------------------------------+----------+-------+
| 2019-09-01T01:36:57.000+0000 | X | N |
| 2019-09-01T01:37:39.000+0000 | A | 3 |
| 2019-09-01T01:42:55.000+0000 | X | Y |
| 2019-09-01T01:53:44.000+0000 | A | 17 |
| 2019-09-01T01:55:34.000+0000 | A | 9 |
| 2019-09-01T01:57:32.000+0000 | X | N |
| 2019-09-01T02:59:40.000+0000 | A | 2 |
| 2019-09-01T02:00:03.000+0000 | A | 16 |
| 2019-09-01T02:01:40.000+0000 | X | Y |
| 2019-09-01T02:04:03.000+0000 | A | 21 |
+------------------------------+----------+-------+
最终数据框架:
+------------------------------+----------+-------+---+
| Timestamp | Property | Value | X |
+------------------------------+----------+-------+---+
| 2019-09-01T01:37:39.000+0000 | A | 3 | N |
| 2019-09-01T01:53:44.000+0000 | A | 17 | Y |
| 2019-09-01T01:55:34.000+0000 | A | 9 | Y |
| 2019-09-01T02:00:03.000+0000 | A | 16 | N |
| 2019-09-01T02:04:03.000+0000 | A | 21 | Y |
| 2019-09-01T02:59:40.000+0000 | A | 2 | Y |
+------------------------------+----------+-------+---+
基本上,我有一个时间戳、一个属性和一个值字段。该属性可以是A
或X
,并且它具有值。我想要一个新的 DataFrame,其中第四列X
基于X
属性的值命名。
- 我开始浏览从最早到最旧的行。
- 我遇到具有 X 属性的行,我存储它的值并将其插入 X 列。
- 如果我遇到 A 属性行:我将上一步中存储的值插入 X 列。
- ELSE(意味着我遇到一个 X 属性行):我更新存储的值(因为它是更新的)并将新的存储值插入 X 列。
- 我一直这样做,直到我完成了整个数据框。
- 我删除了带有 X 属性的行,以使最终数据框显示在上面。
我确信有某种方法可以使用 Window 函数有效地做到这一点。
解决方案
使用值 X 的值创建一个临时列,如果 A 则为空。然后使用窗口获取最后一个非空临时值。最后过滤属性“A”。
scala> val df = Seq(
| ("2019-09-01T01:36:57.000+0000", "X", "N"),
| ("2019-09-01T01:37:39.000+0000", "A", "3"),
| ("2019-09-01T01:42:55.000+0000", "X", "Y"),
| ("2019-09-01T01:53:44.000+0000", "A", "17"),
| ("2019-09-01T01:55:34.000+0000", "A", "9"),
| ("2019-09-01T01:57:32.000+0000", "X", "N"),
| ("2019-09-01T02:59:40.000+0000", "A", "2"),
| ("2019-09-01T02:00:03.000+0000", "A", "16"),
| ("2019-09-01T02:01:40.000+0000", "X", "Y"),
| ("2019-09-01T02:04:03.000+0000", "A", "21")
| ).toDF("Timestamp", "Property", "Value").withColumn("Temp", when($"Property" === "X", $"Value").otherwise(null))
df: org.apache.spark.sql.DataFrame = [Timestamp: string, Property: string ... 2 more fields]
scala> df.show(false)
+----------------------------+--------+-----+----+
|Timestamp |Property|Value|Temp|
+----------------------------+--------+-----+----+
|2019-09-01T01:36:57.000+0000|X |N |N |
|2019-09-01T01:37:39.000+0000|A |3 |null|
|2019-09-01T01:42:55.000+0000|X |Y |Y |
|2019-09-01T01:53:44.000+0000|A |17 |null|
|2019-09-01T01:55:34.000+0000|A |9 |null|
|2019-09-01T01:57:32.000+0000|X |N |N |
|2019-09-01T02:59:40.000+0000|A |2 |null|
|2019-09-01T02:00:03.000+0000|A |16 |null|
|2019-09-01T02:01:40.000+0000|X |Y |Y |
|2019-09-01T02:04:03.000+0000|A |21 |null|
+----------------------------+--------+-----+----+
scala> val overColumns = Window.orderBy("TimeStamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@1b759662
scala> df.withColumn("X", last($"Temp",true).over(overColumns)).show(false)
+----------------------------+--------+-----+----+---+
|Timestamp |Property|Value|Temp|X |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:36:57.000+0000|X |N |N |N |
|2019-09-01T01:37:39.000+0000|A |3 |null|N |
|2019-09-01T01:42:55.000+0000|X |Y |Y |Y |
|2019-09-01T01:53:44.000+0000|A |17 |null|Y |
|2019-09-01T01:55:34.000+0000|A |9 |null|Y |
|2019-09-01T01:57:32.000+0000|X |N |N |N |
|2019-09-01T02:00:03.000+0000|A |16 |null|N |
|2019-09-01T02:01:40.000+0000|X |Y |Y |Y |
|2019-09-01T02:04:03.000+0000|A |21 |null|Y |
|2019-09-01T02:59:40.000+0000|A |2 |null|Y |
+----------------------------+--------+-----+----+---+
scala> df.withColumn("X", last($"Temp",true).over(overColumns)).filter($"Property" === "A").show(false)
+----------------------------+--------+-----+----+---+
|Timestamp |Property|Value|Temp|X |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:37:39.000+0000|A |3 |null|N |
|2019-09-01T01:53:44.000+0000|A |17 |null|Y |
|2019-09-01T01:55:34.000+0000|A |9 |null|Y |
|2019-09-01T02:00:03.000+0000|A |16 |null|N |
|2019-09-01T02:04:03.000+0000|A |21 |null|Y |
|2019-09-01T02:59:40.000+0000|A |2 |null|Y |
+----------------------------+--------+-----+----+---+
推荐阅读
- angular - Web浏览器上的离子角度测试PushNotifications
- javascript - 在 React.JS 中处理 API 调用的输入更改
- .net - RabbitMQ 异常“指定的端点均不可到达”
- c++ - 程序以获取所有输入而不显示任何内容结束
- vscode-extensions - 如何仅在 vscode 扩展的资源管理器上下文菜单中显示命令?
- r - 如何将函数结果添加到列表中
- python - 如何循环遍历 csv 文件并根据列表中的值创建列表?
- arrays - Arduino - Byte Aray 未更新
- mysql - 如何从数据库中区分出现次数多的元素
- android - 如何在 Android Studio 中单击时更改自定义按钮属性