首页 > 解决方案 > 将数据框中带有时间戳的多行事件转换为带有开始和结束日期时间的单行

问题描述

我有单个设备的行,我想对按顺序发生的所有相同事件进行分组。

我也想用 pyspark 做这个

所以给出以下内容:

+--------------------+-------+
|      datetime      | event |
+--------------------+-------+
| 12-02-18T08:20:00  |     1 |
| 12-02-18T08:25:00  |     1 |
| 12-02-18T08:30:00  |     1 |
| 12-02-18T09:00:00  |     2 |
| 12-02-18T09:05:00  |     2 |
| 12-02-18T09:10:00  |     1 |
| 12-02-18T09:15:00  |     1 |
+--------------------+-------+

我想结束以下内容:

+-------------------+-------------------+-------+
|    start_time     |     end_time      | event |
+-------------------+-------------------+-------+
| 12-02-18T08:20:00 | 12-02-18T09:00:00 |     1 |
| 12-02-18T09:00:00 | 12-02-18T09:10:00 |     2 |
| 12-02-18T09:10:00 | null              |     1 |
+-------------------+-------------------+-------+

不会有重叠事件,因此不需要考虑。我教过用 UDF 做这件事,但想知道是否有人知道更优雅/更有效的方法。

标签: pythonapache-sparkpyspark

解决方案


使用 Florian 提供的方法(窗口函数),可以通过在 Scala 上获取具有更改事件的行,然后获取下一个更改日期来完成:

val df = List(
  ("12-02-18T08:20:00", 1),
  ("12-02-18T08:25:00", 1),
  ("12-02-18T08:30:00", 1),
  ("12-02-18T09:00:00", 2),
  ("12-02-18T09:05:00", 2),
  ("12-02-18T09:10:00", 1),
  ("12-02-18T09:15:00", 1)
).toDF("datetime", "event")
df.show(false)

val w = Window.orderBy("datetime")
val changedRowsOnlyDF = df.withColumn("changed", $"event" =!= lag($"event", 1, 0).over(w))
  .where($"changed")

val result = changedRowsOnlyDF
  .withColumn("end_time", lead($"datetime", 1).over(w))
  .drop("changed")
  .withColumnRenamed("datetime", "start_time")
result.show(false)

输出:

+-----------------+-----+-----------------+
|start_time       |event|end_time         |
+-----------------+-----+-----------------+
|12-02-18T08:20:00|1    |12-02-18T09:00:00|
|12-02-18T09:00:00|2    |12-02-18T09:10:00|
|12-02-18T09:10:00|1    |null             |
+-----------------+-----+-----------------+

免责声明:这种方法可用于少量数据,Spark 通过消息通知:

警告 org.apache.spark.sql.execution.window.WindowExec:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。


推荐阅读