首页 > 解决方案 > Apache Beam - 并行化 Google Cloud Storage Blob 下载,同时保持 Blob 分组

问题描述

我希望能够在单个PCollection元素中维护一组实体,但并行从 Google Cloud Storage (GCS) 获取这些实体。即PCollection<Iterable<String>>-->PCollection<Iterable<String>>其中开始PCollection是文件路径的PCollection可迭代,结果是文件内容的可迭代。或者,PCollection<String> --> PCollection<Iterable<String>>也可以工作,甚至可能更可取,其中开始PCollection是 glob 模式,结果PCollection是与 glob 匹配的文件内容的可迭代。

我的用例是在我的管道中的某个点上,我有 input PCollection<String>。的每个元素PCollection都是一个 GCS 全局模式。将匹配 glob 的文件组合在一起很重要,因为文件的内容(一旦读取组中的所有文件)需要在管道的下游进行分组。我最初尝试使用FileIO.matchAll 和随后的GroupByKey. 但是,matchAll、window 和GroupByKey组合无法保证在执行 GroupByKey 转换之前将读取与 glob 匹配的所有文件并在同一个窗口中(尽管我可能误解了 Windowing)。如果时间跨度大,就有可能达到预期的效果WindowFn已应用,但它仍然是概率性的,而不是保证在分组之前将读取所有文件。保持尽可能低的延迟也是我的管道的主要目标。

所以我的下一个并且目前正在运行的计划是使用一个AsyncHttpClient通过 GCS HTTP API 扇出获取文件内容的计划。我觉得这违背了 Beam 的原则,并且在并行化方面可能不是最佳的。

所以我开始调查 SplittableDoFn 。我目前的计划是允许拆分,以便可以单独处理输入 Iterable 中的每个实体(即来自 glob 模式的每个匹配文件)。我已经能够修改FileIO#MatchFn在 Java SDK 中定义)以提供PCollection<String> -> PCollection<Iterable<String>>在 GCS glob 模式的输入和 glob 匹配的 Iterable 输出之间进行转换的机制。

我遇到的挑战是:如何将拆分调用分组/收集回我的单个输出值DoFn?我尝试过使用有状态处理并使用 aBagState来收集文件内容,但我意识到拆分表的ProcessElement方法DoFn可能只接受ProcessContextRestriction元组,并且没有其他参数,因此没有StateId引用 a 的参数StateSpec(抛出无效运行时的参数错误)。

我在FilePatternWatcher官方SDF 提案文档中的示例中注意到,创建了一个自定义跟踪器,其中FilePath对象保存在一个集合中,并可能通过tryClaim. 这似乎适用于我的用例,但我看不到/不明白如何@SplitRestriction使用 custom 来实现方法RestrictionTracker

如果有人能够提供建议,我将不胜感激。我对任何特定的解决方案没有偏好,只是我想实现在单个PCollection元素中维护一组实体的能力,但同时从谷歌云存储 (GCS) 中获取这些实体。

标签: google-cloud-dataflowapache-beam

解决方案


加入输出 PCollections 对您有帮助吗?

PCollectionList
    .of(collectionOne)
    .and(collectionTwo)
    .and(collectionThree)
    ...
    .apply(Flatten.pCollections())

推荐阅读