duplicates - 即使使用 withIdAttribute 也会在 Apache Beam / Dataflow 输入上重复
问题描述
我正在尝试将来自 3rd 方 API 的数据提取到 Dataflow 管道中。由于第 3 方不提供 webhook,我编写了一个自定义脚本,不断轮询其端点以获取更多数据。
数据每 15 分钟刷新一次,但由于我不想错过任何数据点并且我想在新数据可用时立即使用,因此我的“爬虫”每 1 分钟运行一次。然后脚本将数据发送到 PubSub 主题。很容易看出,PubSub 将为源中的每个数据点收到大约 15 条重复消息。
我第一次尝试识别和丢弃这些重复的消息是为每个 PubSub 消息 ( eventid
) 添加一个自定义属性,该属性是从源的 [ID + updated_time] 的哈希创建的。
const attributes = {
eventid: Buffer.from(`${item.lastupdate}|${item.segmentid}`).toString('base64'),
timestamp: item.timestamp.toString()
};
const dataBuffer = Buffer.from(JSON.stringify(item))
publisher.publish(dataBuffer, attributes)
然后我配置了 Dataflow withIdAttribute()
(这是新的idLabel()
,基于Record IDs)。
PCollection<String> input = p
.apply("ReadFromPubSub", PubsubIO
.readStrings()
.fromTopic(String.format("projects/%s/topics/%s", options.getProject(), options.getIncomingDataTopic()))
.withTimestampAttribute("timestamp")
.withIdAttribute("eventid"))
.apply("OutputToBigQuery", ...)
通过该实现,我期望当脚本第二次发送相同的数据点时,重复的数据点将eventid
相同并且消息被丢弃。但由于某种原因,我仍然在输出数据集上看到重复项。
一些问题:
- 如果他们不提供 webhook,是否有一种聪明的方法可以从该 3rd 方 API 将数据摄取到数据流中?
- 关于为什么数据流不丢弃这种情况下的消息的任何想法?
- 我知道对数据流进行重复数据删除的 10 分钟限制,但即使在第二次插入(2 分钟)时我也看到重复的数据。
任何帮助将不胜感激!
解决方案
推荐阅读
- javascript - 更改路由时如何在 React 组件的 CSS 样式之间切换?
- typescript - Ionic 4 离子输入只允许数字,限制字母和特殊字符
- python - 如何使独立编译的 cython 包使用共享随机数生成器?
- azure - 停止在 Azure 订阅上创建资源
- rstudio - 如何在我的 rstudio 针织 html 中有两行标题
- redis - 我如何找出我的密钥存储在 Redis Cache/db 中的时间?
- selenium - ChromeOptions --headless 在 Selenium 3.5.3 中无效
- javascript - 单击jquery中的添加行时如何更新输入字段中的值?
- ansible - 是否阻止它们内部的ansible支持条件
- c# - 在 C# 中动态寻址 JSON