首页 > 解决方案 > 将相同的用户事件聚合到两个不同的窗口中

问题描述

我正在尝试使用 Apache Beam 编写一个简单的管道。假设我正在接受看起来像这样的用户请求:

(country, user_id, score, timestamp)

我只想总结每个国家所有用户的总分,每分钟和 10 分钟累积。但是,需要注意的是,我想为每个用户、每个存储桶获取最新分数。意思是,如果我有两条记录:

('USA', 1, 10, 62)
('USA', 1, 4, 64)

并假设它们映射到同一个分钟桶,我想保留第二条记录(后面的一条得分为 4)。

我如何有效地做到这一点?现在,我正在获取用户事件流并将其传送到两个单独的分支——一个每分钟计算一次聚合,一个每 10 分钟计算一次。显然,很多计算在这里加倍。理想情况下,我们将能够重新使用每 1 分钟窗口的计算来加起来 10 分钟窗口,但我不太清楚如何做到这一点。

谢谢!

标签: pythonstreaminggoogle-cloud-dataflowapache-beam

解决方案


将元素通过管道传输到两个不同的分支可能没什么大不了的,但是是的,您可以通过避免重复聚合的方式来执行此操作。

假设您的 10 分钟和 1 分钟窗口可以均匀地相互转换(固定时间窗口应该可以正常工作),您可以执行以下操作:

Assign 1 min. windows -> Aggregate -> Assign 10 min. windows -> Aggregate

在第一次聚合之后(可能是某种类型的组合),结果元素应该具有组合元素的最新时间戳(可以通过更改TimestampCombiner来修改)。这意味着只要窗口在从一个转换到另一个时均匀排列,第二个聚合应该聚合与原始方法相同的所有数据。

对于您问题的第二部分,要保留窗口的最新时间戳元素并删除其他元素,您需要实现一个自定义CombineFn来保留最新元素。现在,为了从 CombineFn 中实际读取元素的时间戳,您首先需要使用Reify.timestamps将时间戳附加到元素。而且您可能希望您的 CombineFn 输出您的原始元素类型而没有时间戳。所以总的来说它看起来像这样(方括号中的PCollections,所以你可以看到类型):

[ElementT] -> Reify.timestamps -> [TimestampedValue<ElementT>] -> Combine -> [ElementT]


推荐阅读