首页 > 解决方案 > Scala Spark 跨团队跨度和密钥按时间段求和

问题描述

我有一个非常类似于如何在 Spark SQL 中按时间间隔分组的问题

但是,我的指标是花费的时间(duration),所以我的数据看起来像

KEY |Event_Type | duration | Time 
001 |event1     | 10     | 2016-05-01 10:49:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

有没有办法将花费的时间汇总到按键分组的五分钟存储桶中,并知道持续时间何时超出存储桶的范围?

例如,第一行从 10:49:51 开始,到 10:50:01 结束 因此,window 中键 001 的存储桶[2016-05-01 10:45:00.0,2016-05-01 10:50:00.0]将获得 8 秒的持续时间(51 秒到 60 秒)和 10:50到 10:55 将获得 2 秒的持续时间,加上其他日志行的相关秒数(第三行 20 秒,第四行 15 秒)。

我想对特定存储桶中的时间求和,但另一个线程上的解决方案 df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") 会在存储桶中重叠存储桶的时间戳记高估,并低估后续存储桶

注意:我的Time列也是 Epoch 时间戳1636503077,但如果这样计算更容易,我可以轻松地将其转换为上述格式。

标签: scalaapache-spark

解决方案


在我看来,也许您需要通过将持续时间溢出到每分钟(或每五分钟)来预处理数据。
如你所愿,第一行

001 |event1     | 10     | 2016-05-01 10:49:51

应该转换为

001 |event1     | 9     | 2016-05-01 10:49:51
001 |event1     | 1     | 2016-05-01 10:50:00

然后您可以使用火花窗口函数对其进行正确求和。

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

如果您只想知道时间段的持续时间,这不会改变结果,但会增加记录数。


推荐阅读