apache-spark - Apache Spark (Scala) 与不同组的跨时间聚合
问题描述
我想要完成的是计算一艘船停泊的总时间。我正在处理的数据本质上是时间序列的。在从 A 点到 B 点的整个航程中,它可以多次停止和启动。
基本上,对于每个 ID(船舶唯一 ID),我想计算在锚点上花费的总时间(状态 ===“锚定”)。对于每个“锚定”时间段,取最后一个时间戳并从第一个时间戳中减去它(反之亦然,我只取绝对值)。如果一艘船在其旅程中只停留一次(窗口功能),我可以很容易地做到这一点。但是,当它在整个旅程中多次停止和启动时,我遇到了麻烦。窗口函数可以处理这个吗?
这是我正在处理的数据和预期输出的示例:
val df = Seq(
(123, "UNDERWAY", 0),
(123, "ANCHORED", 12), // first anchored (first time around)
(123, "ANCHORED", 20), //take this timestamp and sub from previous
(123, "UNDERWAY", 32),
(123, "UNDERWAY", 44),
(123, "ANCHORED", 50), // first anchored (second time around)
(123, "ANCHORED", 65),
(123, "ANCHORED", 70), //take this timestamp and sub from previous
(123, "ARRIVED", 79)
).toDF("id", "status", "time")
+---+--------+----+
|id |status |time|
+---+--------+----+
|123|UNDERWAY|0 |
|123|ANCHORED|12 |
|123|ANCHORED|20 |
|123|UNDERWAY|32 |
|123|UNDERWAY|44 |
|123|ANCHORED|50 |
|123|ANCHORED|65 |
|123|ANCHORED|70 |
|123|ARRIVED |79 |
+---+--------+----+
// the resulting output I need is as follows (aggregation of total time spent at anchor)
// the ship spent 8 hours at anchor the first time, and then spent
// 20 hours at anchor the second time. So total time is 28 hours
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28 |
+---+-----------------+
船停泊的每个“航段”我想计算停泊时间,然后将所有这些航段加起来以获得停泊总时间。
解决方案
我是Window
函数新手,所以它可能会做得更好,但这是我想出的:
此解决方案仅查看“this - previous”,而不是每个“组”状态中的“last - first”。但是,净效果应该是相同的,因为无论如何它都会将它们加在一起。
import org.apache.spark.sql.expressions.Window
val w = Window.orderBy($"time")
df.withColumn("tdiff", when($"status" === lag($"status", 1).over(w), $"time" - lag($"time", 1).over(w)))
.where($"status" === lit("ANCHORED"))
.groupBy("id", "status")
.agg(sum("tdiff").as("timeSpentAtAnchor"))
.select("id", "timeSpentAtAnchor")
.show(false)
这使:
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28 |
+---+-----------------+
答案是由来自这个答案的信息形成的。而且,正如那里所说:
注意:由于此示例未使用任何分区,因此可能存在性能问题,在您的真实数据中,如果您的问题可以通过一些变量进行分区,将会很有帮助。
推荐阅读
- mysql - MYSQL - 行到列转置
- average - Grafana 平均间隔
- webpack - 面向 webworker 的 Webpack 不会导入依赖项的浏览器版本
- crystal-lang - 如何在 Crystal 中进行通用记忆?
- python - 使 Tkinter 中的标签可点击并在新的 Web 浏览器中打开 UrL
- javascript - 在 Chrome 中调试 javascript iframe 调用程序
- html - 无法在 Google Chrome 中寻找、倒带 html 音频播放器,它只是不起作用
- node.js - multer req 文件未定义,但为什么呢?
- algorithm - 给定一些数字,如何构建一个包含算术表达式的所有排列的高效树?
- python - 无法将背景图像与静态文件 Django 连接