首页 > 解决方案 > 如何向流式管道添加重复数据删除 [apache-beam]

问题描述

我在 apache Beam [python] 中有一个工作流管道,它从 pub/sub 摄取数据,在数据流中执行丰富并将其传递给大查询。

使用流窗口,我想确保消息不会重复(因为 pub/sub 保证至少一次传递)。

所以,我想我只是使用与梁不同的方法,但是一旦我使用它,我的管道就会中断(无法继续进行,任何本地打印也不可见)。

这是我的管道代码:

    with beam.Pipeline(options=options) as p:
        message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
                   with_output_types(bytes))

        bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
                           | "Deduplication" >> beam.Distinct()
                           | "JSONLoad" >> beam.ParDo(ReadAsJSON())
                           | "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
                           | "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
                           | "PreProcessing" >> beam.ParDo(PreProcessing())
                           | "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
                   )

        if not known_args.local:
            bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
        else:
            bq_data | "Display" >> beam.ParDo(Display())

正如您在重复数据删除标签中看到的那样,我正在调用 beam.Distinct 方法。

问题:

  1. 重复数据删除应该在管道中的哪里进行?

  2. 这甚至是正确/理智的方法吗?

  3. 我还能如何对流缓冲区数据进行重复数据删除?

  4. 是否需要重复数据删除,还是我只是在浪费时间?

任何解决方案或建议将不胜感激。谢谢。

标签: pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


您可能会发现这篇关于Exactly-once processing 的博客很有帮助。首先,Dataflow 已经根据发布/订阅记录 ID 执行重复数据删除。然而,正如博客所述:“然而,在某些情况下,这还不够。用户的发布过程可能会重试发布”。

因此,如果将消息发布到 Pub/Sub 的系统可能多次发布同一消息,那么您可能希望添加自己的确定性记录 ID。然后 Cloud Dataflow 会检测到这些。这是我推荐的方法,而不是尝试在您自己的管道中进行重复数据删除。

您可以通过使用PubSubIO.Read上的 withIdAttribute 来执行此操作。例子

关于为什么我相信 Distinct 会导致卡顿的一些解释。Distinct尝试对 Window 中的数据进行重复数据删除。我相信您正在尝试对全局窗口进行重复数据处理,因此您的管道必须缓冲和比较所有元素,因为这是一个无界的 PCollection。它将尝试永远缓冲。

我相信如果您首先执行窗口化,并且您具有确定性的事件时间戳(看起来您使用的不是 withTimestampAttribute),这将正常工作。然后 Distinct 将仅应用于窗口内的元素(并且具有相同时间戳的相同元素将被放置在同一个窗口中)。您可能想看看这是否适用于原型设计,但我建议尽可能添加唯一的记录 ID,并允许 Dataflow 基于记录 ID 处理重复以获得最佳性能。


推荐阅读