首页 > 解决方案 > DirectRunner 不会按照我在 Beam Java SDK 中使用 FixedWindows 指定的方式从 Pub/Sub 读取

问题描述

我目前正在使用 Apache Beam Java SDK 2.8.0 从 Pub/Sub 读取流数据的 Dataflow 管道。该管道只是来自 Google 的 PubsubToText.java 模板。

https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToText.java

虽然使用 DataflowRunner 部署到云按预期工作,但它无法使用 DirectRunner 正确运行,即当我在本地环境中工作时,这使得开发管道变得非常困难。

例如,当我将 FixedWindows 速率设置为 30 秒时,云上的 Dataflow Runner 每 30 秒生成一次文件,这是预期的。

但是,当我在本地环境中为 DirectRunner 设置相同的速率时,它不会每 30 秒发出一次文件。相反,它以不稳定的方式生成文件。

例如,它在4分钟后发出第一个数据并创建8个应该已经创建的文件实际上是一次生成的,5分钟后下一个,3分钟后下一个,......等等,这使得本地开发过程极其耗时且令人沮丧。

我为什么要观察这个?

将 Java SDK 从 8 切换到 11,将 Beam SDK 从 2.8.0 切换到 2.9.0 或 2.10.0,将环境从本地切换到 GCE 实例,以及从 GCS 到本地的管道输出都没有帮助。

这是重现问题的全部内容:

  1. git clone https://github.com/GoogleCloudPlatform/DataflowTemplates
  2. 从 pom.xml 中删除<scope>test</scope>beam-runners-direct-java 的行,使其在运行时支持 DirectRunner。
  3. 按照https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToText.java上的建议编译和运行程序,但将 runner 更改为DirectRunner 和添加--outputShardTemplate=W-P-SS-of-NN,这是一个省略的选项,在本地运行时需要。
  4. 同时删除--project--stagingLocationtempLocation行,因为它不会部署到云端。
  5. 发出文件需要很长时间,尽管我设置了,例如,windowDuration=30s

我怀疑这是与 Pub/Sub 相关的问题,但是当我运行 tcpdump 时,它开始连接到 Pub/Sub 并立即拉取数据。这可能是 DirectRunner 特定的问题。

标签: javagoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


虽然我不知道为什么会发生这种情况,但我找到了解决这个问题的方法。虽然DataflowRunner不需要您为其设置触发器以使其按预期工作,但您必须为DirectRunner.

附加.trrigering到 Window.into,问题就消失了。


推荐阅读