scala - Scala Spark 跨团队跨度和密钥按时间段求和
问题描述
我有一个非常类似于如何在 Spark SQL 中按时间间隔分组的问题
但是,我的指标是花费的时间(duration
),所以我的数据看起来像
KEY |Event_Type | duration | Time
001 |event1 | 10 | 2016-05-01 10:49:51
002 |event2 | 100 | 2016-05-01 10:50:53
001 |event3 | 20 | 2016-05-01 10:50:55
001 |event1 | 15 | 2016-05-01 10:51:50
003 |event1 | 13 | 2016-05-01 10:55:30
001 |event2 | 12 | 2016-05-01 10:57:00
001 |event3 | 11 | 2016-05-01 11:00:01
有没有办法将花费的时间汇总到按键分组的五分钟存储桶中,并知道持续时间何时超出存储桶的范围?
例如,第一行从 10:49:51 开始,到 10:50:01 结束 因此,window 中键 001 的存储桶[2016-05-01 10:45:00.0,2016-05-01 10:50:00.0]
将获得 8 秒的持续时间(51 秒到 60 秒)和 10:50到 10:55 将获得 2 秒的持续时间,加上其他日志行的相关秒数(第三行 20 秒,第四行 15 秒)。
我想对特定存储桶中的时间求和,但另一个线程上的解决方案
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
会在存储桶中重叠存储桶的时间戳记高估,并低估后续存储桶
注意:我的Time
列也是 Epoch 时间戳1636503077
,但如果这样计算更容易,我可以轻松地将其转换为上述格式。
解决方案
在我看来,也许您需要通过将持续时间溢出到每分钟(或每五分钟)来预处理数据。
如你所愿,第一行
001 |event1 | 10 | 2016-05-01 10:49:51
应该转换为
001 |event1 | 9 | 2016-05-01 10:49:51
001 |event1 | 1 | 2016-05-01 10:50:00
然后您可以使用火花窗口函数对其进行正确求和。
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
如果您只想知道时间段的持续时间,这不会改变结果,但会增加记录数。
推荐阅读
- python - 在pyspark中用平均值填充缺失值
- php - 属性 [id] 不存在 laravel
- python - 在没有 y 坐标的 Matplotlib 图中添加文本
- javascript - 根据数组对象数据创建一个新数组
- date - 两个日期之间的序列,仅按指定的工作日,跳过所选的周数
- c++ - 如何为这个函数制作 lambda?
- node.js - “gulp serve”正在返回我无法将 Web 部件添加到的 workbench.html
- batch-file - 许多启动命令在同一个窗口中批处理
- javascript - innerHTML 赋值删除空标签中的所有(包括最后一个)空格
- python - 我可以在我的 android 应用程序中包含 linux 可执行文件吗?