google-cloud-datastore - Dataflow 状态处理中的 Exactly-once 语义
问题描述
我们试图在流设置中涵盖以下场景:
- 计算自作业开始以来用户事件的聚合(比如说计数)
- 用户事件的数量是无限的(因此不能只使用本地状态)
我将讨论我们正在考虑的三个选项,其中前两个选项容易丢失数据,而最后一个选项不清楚。我们想更深入地了解最后一个。当然也欢迎替代方法。
谢谢!
方法 1:会话窗口、数据存储和幂等性
- x 秒的滑动窗口
- 按用户 ID 分组
- 更新数据存储
更新数据存储意味着:
- 启动 trx
- 为该用户读取的数据存储
- 合并新信息
- 数据存储写入
- 结束 trx
数据存储条目包含等于滑动窗口时间戳的幂等性 id
问题:
Windows 可以同时触发,然后可以无序处理导致数据丢失(由 Google 确认)
方法:会话窗口、数据存储和状态
- x 秒的滑动窗口
- 按用户 ID 分组
- 更新数据存储
更新数据存储意味着:
- 预检查:检查此键窗口的状态是否为
true
,如果是,则跳过以下步骤 - 启动 trx
- 为该用户读取的数据存储
- 合并新信息
- 数据存储写入
- 结束 trx
- 存储我们处理过的这个关键窗口的状态 (
true
)
重新执行将因此跳过重复更新
问题:
5 到 7 之间的失败不会写入本地状态,导致重新执行并可能计算元素两次。我们可以通过使用多个状态来规避这个问题,但是我们仍然可以删除数据。
方法 3:全局窗口、定时器和状态
基于文章Timely (and Stateful) Processing with Apache Beam,我们将创建:
- 全局窗口
- 按用户 ID 分组
- 在有状态的 DoFn 中缓冲/计数所有传入事件
- 在第一个事件后刷新 x 次。
同花将意味着与方法 1 相同
问题:
对一次性处理和状态的保证尚不清楚。如果在状态中写入元素并且重新执行捆绑包会发生什么?状态是否恢复到该捆绑包之前?
非常感谢任何指向这方面文档的链接。例如,容错如何与定时器一起工作?
解决方案
从您的方法 1 和 2 中,不清楚乱序合并是数据丢失还是问题。我能想到以下几点。
方法1:不要因为乱序问题而立即合并会话窗口聚合。相反,将它们分开存储,并在足够的时间后,您可以按时间戳顺序合并中间结果。
方法 2:将状态移动到事务中。这样,任何临时故障都不会让事务完成并合并数据。会话窗口聚合的后续成功处理不会导致重复计算。
推荐阅读
- php - 如果第一行不为空,如何将值插入第二行?数据库
- javascript - 如何使用 javascript 使用下拉菜单过滤 html 表的数据
- java - Jooq (java) - 方言默认不支持类型类 org.jooq.impl.UnqualifiedName
- ruby - 如何将“email@domain.com”转换为“em***@domain.com”?
- gcc - gcc 编译,得到“clang:错误:不支持的选项'-fopenmp'”
- mongodb - 从 mongodbdump.tar GZ 文件中获取数据到 localhost db
- r - 使用 R 根据子字符串的第 n 次出现有效地分解字符串
- wordpress - 为 Wordpress 中的每个帖子自动创建的民意调查
- amazon-web-services - 如何指向 ALB 的 hesos 而不是 rightscale?
- json - Inno Setup:如何从 JSON 文件的子部分编辑和检索值