python - 如何向流式管道添加重复数据删除 [apache-beam]
问题描述
我在 apache Beam [python] 中有一个工作流管道,它从 pub/sub 摄取数据,在数据流中执行丰富并将其传递给大查询。
使用流窗口,我想确保消息不会重复(因为 pub/sub 保证至少一次传递)。
所以,我想我只是使用与梁不同的方法,但是一旦我使用它,我的管道就会中断(无法继续进行,任何本地打印也不可见)。
这是我的管道代码:
with beam.Pipeline(options=options) as p:
message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
with_output_types(bytes))
bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
| "Deduplication" >> beam.Distinct()
| "JSONLoad" >> beam.ParDo(ReadAsJSON())
| "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
| "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
| "PreProcessing" >> beam.ParDo(PreProcessing())
| "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
)
if not known_args.local:
bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
else:
bq_data | "Display" >> beam.ParDo(Display())
正如您在重复数据删除标签中看到的那样,我正在调用 beam.Distinct 方法。
问题:
重复数据删除应该在管道中的哪里进行?
这甚至是正确/理智的方法吗?
我还能如何对流缓冲区数据进行重复数据删除?
是否需要重复数据删除,还是我只是在浪费时间?
任何解决方案或建议将不胜感激。谢谢。
解决方案
您可能会发现这篇关于Exactly-once processing 的博客很有帮助。首先,Dataflow 已经根据发布/订阅记录 ID 执行重复数据删除。然而,正如博客所述:“然而,在某些情况下,这还不够。用户的发布过程可能会重试发布”。
因此,如果将消息发布到 Pub/Sub 的系统可能多次发布同一消息,那么您可能希望添加自己的确定性记录 ID。然后 Cloud Dataflow 会检测到这些。这是我推荐的方法,而不是尝试在您自己的管道中进行重复数据删除。
您可以通过使用PubSubIO.Read上的 withIdAttribute 来执行此操作。例子。
关于为什么我相信 Distinct 会导致卡顿的一些解释。Distinct尝试对 Window 中的数据进行重复数据删除。我相信您正在尝试对全局窗口进行重复数据处理,因此您的管道必须缓冲和比较所有元素,因为这是一个无界的 PCollection。它将尝试永远缓冲。
我相信如果您首先执行窗口化,并且您具有确定性的事件时间戳(看起来您使用的不是 withTimestampAttribute),这将正常工作。然后 Distinct 将仅应用于窗口内的元素(并且具有相同时间戳的相同元素将被放置在同一个窗口中)。您可能想看看这是否适用于原型设计,但我建议尽可能添加唯一的记录 ID,并允许 Dataflow 基于记录 ID 处理重复以获得最佳性能。
推荐阅读
- reactjs - 渲染前无法获取数据
- ruby-on-rails - 将包含 Hashie::Mash 的字符串转换为实际的 Hashie::Mash
- laravel-5 - 调用未定义函数 Mpdf\\Mpdf()
- python - 为什么我要绘制的代码没有显示带有标题、xlabel 和 ylabel 的输出?
- python - 病毒防护隔离我的 python exe 文件?
- spring - Spring boot 随机崩溃,没有错误
- javascript - 从函数中的子组件调用 prop 不起作用
- java - Java Web 应用程序 HTTPS 仅在 -Djavax.net.debug=all 设置时有效
- javascript - 如何从 .find JS 返回副本?
- html - 我的 html 代码没有显示在我的网站上,可能是什么问题?