首页 > 解决方案 > 即使使用 withIdAttribute 也会在 Apache Beam / Dataflow 输入上重复

问题描述

我正在尝试将来自 3rd 方 API 的数据提取到 Dataflow 管道中。由于第 3 方不提供 webhook,我编写了一个自定义脚本,不断轮询其端点以获取更多数据。

数据每 15 分钟刷新一次,但由于我不想错过任何数据点并且我想在新数据可用时立即使用,因此我的“爬虫”每 1 分钟运行一次。然后脚本将数据发送到 PubSub 主题。很容易看出,PubSub 将为源中的每个数据点收到大约 15 条重复消息。

我第一次尝试识别和丢弃这些重复的消息是为每个 PubSub 消息 ( eventid) 添加一个自定义属性,该属性是从源的 [ID + updated_time] 的哈希创建的。

const attributes = {
         eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
         timestamp: item.timestamp.toString()
      };
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)

然后我配置了 Dataflow withIdAttribute()(这是新的idLabel(),基于Record IDs)。

PCollection<String> input = p
    .apply("ReadFromPubSub", PubsubIO
       .readStrings()
       .fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
       .withTimestampAttribute("timestamp")
       .withIdAttribute("eventid"))
   .apply("OutputToBigQuery", ...)

通过该实现,我期望当脚本第二次发送相同的数据点时,重复的数据点将eventid相同并且消息被丢弃。但由于某种原因,我仍然在输出数据集上看到重复项。

一些问题:

  1. 如果他们不提供 webhook,是否有一种聪明的方法可以从该 3rd 方 API 将数据摄取到数据流中?
  2. 关于为什么数据流不丢弃这种情况下的消息的任何想法?
    • 我知道对数据流进行重复数据删除的 10 分钟限制,但即使在第二次插入(2 分钟)时我也看到重复的数据。

任何帮助将不胜感激!

标签: duplicatesgoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


我认为您走在正确的轨道上,而不是我建议使用时间戳的哈希。更好的方法是使用 Windows。查看此文档,该文档过滤窗口外的数据。

关于额外的重复数据,如果您正在使用拉取订阅并且在处理数据之前达到确认截止日期,则将根据至少一次交付重新发送消息。在这种情况下更改确认期限,默认值为 10 秒。


推荐阅读