python - 将相同的用户事件聚合到两个不同的窗口中
问题描述
我正在尝试使用 Apache Beam 编写一个简单的管道。假设我正在接受看起来像这样的用户请求:
(country, user_id, score, timestamp)
我只想总结每个国家所有用户的总分,每分钟和 10 分钟累积。但是,需要注意的是,我想为每个用户、每个存储桶获取最新分数。意思是,如果我有两条记录:
('USA', 1, 10, 62)
('USA', 1, 4, 64)
并假设它们映射到同一个分钟桶,我想保留第二条记录(后面的一条得分为 4)。
我如何有效地做到这一点?现在,我正在获取用户事件流并将其传送到两个单独的分支——一个每分钟计算一次聚合,一个每 10 分钟计算一次。显然,很多计算在这里加倍。理想情况下,我们将能够重新使用每 1 分钟窗口的计算来加起来 10 分钟窗口,但我不太清楚如何做到这一点。
谢谢!
解决方案
将元素通过管道传输到两个不同的分支可能没什么大不了的,但是是的,您可以通过避免重复聚合的方式来执行此操作。
假设您的 10 分钟和 1 分钟窗口可以均匀地相互转换(固定时间窗口应该可以正常工作),您可以执行以下操作:
Assign 1 min. windows -> Aggregate -> Assign 10 min. windows -> Aggregate
在第一次聚合之后(可能是某种类型的组合),结果元素应该具有组合元素的最新时间戳(可以通过更改TimestampCombiner来修改)。这意味着只要窗口在从一个转换到另一个时均匀排列,第二个聚合应该聚合与原始方法相同的所有数据。
对于您问题的第二部分,要保留窗口的最新时间戳元素并删除其他元素,您需要实现一个自定义CombineFn来保留最新元素。现在,为了从 CombineFn 中实际读取元素的时间戳,您首先需要使用Reify.timestamps将时间戳附加到元素。而且您可能希望您的 CombineFn 输出您的原始元素类型而没有时间戳。所以总的来说它看起来像这样(方括号中的PCollections,所以你可以看到类型):
[ElementT] -> Reify.timestamps -> [TimestampedValue<ElementT>] -> Combine -> [ElementT]
推荐阅读
- c# - 如何从稍后在方程式中传递的单个文本框中获取 2 个不同的浮点数。(C#)
- ios - 与蜂窝网络相比,在 wifi 上的 React-native 获取速度非常慢。在开发和生产中,无论运行/不是调试器
- kubernetes - 无法在谷歌云中使用 kubernetes 公开部署
- multithreading - 在 Lparallel 库中使用队列(Common Lisp)
- android - Xamarin.Android 设计器是阿拉伯语的 VS 错误
- php - 上传文件 PHP EC2 不工作没有错误
- ruby - 使用 Ruby gets.chomp 方法向用户询问多个问题
- type-theory - 分离函数的联合类型?
- google-sheets - 在 Google 表格中查找奇数计数的公式
- r - 检查字符是否在数据框中