首页 > 解决方案 > Google Dataflow 和 Pubsub - 无法实现一次性交付

问题描述

我正在尝试使用 Google Dataflow 和使用 Apache Beam SDK 2.6.0 的 PubSub 实现一次性交付。

用例非常简单:

“生成器”数据流作业向 PubSub 主题发送 1M 条消息。

GenerateSequence
          .from(0)
          .to(1000000)
          .withRate(100000, Duration.standardSeconds(1L));

“存档”数据流作业从 PubSub 订阅中读取消息并保存到 Google Cloud Storage。

pipeline
        .apply("Read events",
            PubsubIO.readMessagesWithAttributes()
                // this is to achieve exactly-once delivery
                .withIdAttribute(ATTRIBUTE_ID)
                .fromSubscription('subscription')
                .withTimestampAttribute(TIMESTAMP_ATTRIBUTE))
        .apply("Window events",
            Window.<Dto>into(FixedWindows.of(Duration.millis(options.getWindowDuration())))
                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                .withAllowedLateness(Duration.standardMinutes(15))
                .discardingFiredPanes())
        .apply("Events count metric", ParDo.of(new CountMessagesMetric()))
        .apply("Write files to archive",
            FileIO.<String, Dto>writeDynamic()
                .by(Dto::getDataSource).withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.of((msg, ctx) -> msg.getData(), Requirements.empty()), TextIO.sink())
                .to(archiveDir)
                .withTempDirectory(archiveDir)
                .withNumShards(options.getNumShards())
                .withNaming(dataSource ->
                    new SyslogWindowedDataSourceFilenaming(dataSource, archiveDir, filenamePrefix, filenameSuffix)
                ));

我在 Pubsub.IO.Write('Generator' 作业)和 PubsubIO.Read('Archive' 作业)中都添加了 'withIdAttribute',并期望它能够保证精确一次的语义。

我想测试“负面”情况:

  1. “生成器”数据流作业向 PubSub 主题发送 1M 条消息。
  2. “存档”数据流作业开始工作,但我在处理过程中单击“停止作业”->“排水”将其停止。部分消息已处理并保存到 Cloud Storage,例如 40 万条消息。
  3. 我再次开始“存档”工作,并且确实希望它会占用未处理的消息(600K),最终我会看到准确的 1M 消息保存到存储中。

实际上我得到的 - 所有消息都被传递(至少一次被实现),但最重要的是有很多重复 - 每 1M 消息大约 30-50K。

是否有任何解决方案可以实现一次性交付?

标签: google-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


数据流不允许您跨运行保持状态。如果您使用 Java,您可以以一种不会导致其丢失现有状态的方式更新正在运行的管道,从而允许您跨管道版本进行重复数据删除。

如果这对您不起作用,您可能希望以一种由 ATTRIBUTE_ID 键入的方式归档消息,例如。Spanner或 GCS 使用它作为文件名。


推荐阅读