首页 > 解决方案 > Apache Spark (Scala) 与不同组的跨时间聚合

问题描述

我想要完成的是计算一艘船停泊的总时间。我正在处理的数据本质上是时间序列的。在从 A 点到 B 点的整个航程中,它可以多次停止和启动。

基本上,对于每个 ID(船舶唯一 ID),我想计算在锚点上花费的总时间(状态 ===“锚定”)。对于每个“锚定”时间段,取最后一个时间戳并从第一个时间戳中减去它(反之亦然,我只取绝对值)。如果一艘船在其旅程中只停留一次(窗口功能),我可以很容易地做到这一点。但是,当它在整个旅程中多次停止和启动时,我遇到了麻烦。窗口函数可以处理这个吗?

这是我正在处理的数据和预期输出的示例:

    val df = Seq(
        (123, "UNDERWAY", 0), 
        (123, "ANCHORED", 12), // first anchored (first time around)
        (123, "ANCHORED", 20), //take this timestamp and sub from previous
        (123, "UNDERWAY", 32), 
        (123, "UNDERWAY", 44), 
        (123, "ANCHORED", 50), // first anchored (second time around)
        (123, "ANCHORED", 65), 
        (123, "ANCHORED", 70), //take this timestamp and sub from previous
        (123, "ARRIVED", 79)
        ).toDF("id", "status", "time")

+---+--------+----+
|id |status  |time|
+---+--------+----+
|123|UNDERWAY|0   |
|123|ANCHORED|12  |
|123|ANCHORED|20  |
|123|UNDERWAY|32  |
|123|UNDERWAY|44  |
|123|ANCHORED|50  |
|123|ANCHORED|65  |
|123|ANCHORED|70  |
|123|ARRIVED |79  |
+---+--------+----+

// the resulting output I need is as follows (aggregation of total time spent at anchor)
// the ship spent 8 hours at anchor the first time, and then spent 
// 20 hours at anchor the second time. So total time is 28 hours
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28               |
+---+-----------------+

船停泊的每个“航段”我想计算停泊时间,然后将所有这些航段加起来以获得停泊总时间。

标签: apache-sparkapache-spark-sql

解决方案


我是Window函数新手,所以它可能会做得更好,但这是我想出的:

此解决方案仅查看“this - previous”,而不是每个“组”状态中的“last - first”。但是,净效果应该是相同的,因为无论如何它都会将它们加在一起。

import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"time")

df.withColumn("tdiff", when($"status" === lag($"status", 1).over(w), $"time" - lag($"time", 1).over(w)))
  .where($"status" === lit("ANCHORED"))
  .groupBy("id", "status")
  .agg(sum("tdiff").as("timeSpentAtAnchor"))
  .select("id", "timeSpentAtAnchor")
  .show(false)

这使:

+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28               |
+---+-----------------+

答案是由来自这个答案的信息形成的。而且,正如那里所说:

注意:由于此示例未使用任何分区,因此可能存在性能问题,在您的真实数据中,如果您的问题可以通过一些变量进行分区,将会很有帮助。


推荐阅读