google-cloud-platform - Google Dataflow 和 Pubsub - 无法实现一次性交付
问题描述
我正在尝试使用 Google Dataflow 和使用 Apache Beam SDK 2.6.0 的 PubSub 实现一次性交付。
用例非常简单:
“生成器”数据流作业向 PubSub 主题发送 1M 条消息。
GenerateSequence
.from(0)
.to(1000000)
.withRate(100000, Duration.standardSeconds(1L));
“存档”数据流作业从 PubSub 订阅中读取消息并保存到 Google Cloud Storage。
pipeline
.apply("Read events",
PubsubIO.readMessagesWithAttributes()
// this is to achieve exactly-once delivery
.withIdAttribute(ATTRIBUTE_ID)
.fromSubscription('subscription')
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
.apply("Window events",
Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.standardMinutes(15))
.discardingFiredPanes())
.apply("Events count metric", ParDo.of(new CountMessagesMetric()))
.apply("Write files to archive",
FileIO.<String, Dto>writeDynamic()
.by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
.to(archiveDir)
.withTempDirectory(archiveDir)
.withNumShards(options.getNumShards())
.withNaming(dataSource ->
new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
));
我在 Pubsub.IO.Write('Generator' 作业)和 PubsubIO.Read('Archive' 作业)中都添加了 'withIdAttribute',并期望它能够保证精确一次的语义。
我想测试“负面”情况:
- “生成器”数据流作业向 PubSub 主题发送 1M 条消息。
- “存档”数据流作业开始工作,但我在处理过程中单击“停止作业”->“排水”将其停止。部分消息已处理并保存到 Cloud Storage,例如 40 万条消息。
- 我再次开始“存档”工作,并且确实希望它会占用未处理的消息(600K),最终我会看到准确的 1M 消息保存到存储中。
实际上我得到的 - 所有消息都被传递(至少一次被实现),但最重要的是有很多重复 - 每 1M 消息大约 30-50K。
是否有任何解决方案可以实现一次性交付?
解决方案
数据流不允许您跨运行保持状态。如果您使用 Java,您可以以一种不会导致其丢失现有状态的方式更新正在运行的管道,从而允许您跨管道版本进行重复数据删除。
如果这对您不起作用,您可能希望以一种由 ATTRIBUTE_ID 键入的方式归档消息,例如。Spanner或 GCS 使用它作为文件名。
推荐阅读
- ios - Swift:如何为数组中的变量赋值?
- javascript - map函数在useEffect中运行后如何设置值
- c# - 除了作为访问修饰符和序列化字段之外,“公共”的统一功能是什么?
- c++ - 白色方形 SFML
- javascript - NODEJS:从对话框 openDirectory 返回数组数组
- android - Android 主题 - statusBarColor 项目上的“无法解析符号”错误
- swift - 如何使用 TextField 更新全局变量?
- mysql - 实体框架和 MySQL 中的求和错误(Sum)
- python - 通过套接字以原始表示形式写入连续的 numpy 数组
- kubernetes - 从 VS Code 到 Kubernetes 的连接失败