google-cloud-dataflow - Apache Beam - 并行化 Google Cloud Storage Blob 下载,同时保持 Blob 分组
问题描述
我希望能够在单个PCollection
元素中维护一组实体,但并行从 Google Cloud Storage (GCS) 获取这些实体。即PCollection<Iterable<String>>
-->PCollection<Iterable<String>>
其中开始PCollection
是文件路径的PCollection
可迭代,结果是文件内容的可迭代。或者,PCollection<String> --> PCollection<Iterable<String>>
也可以工作,甚至可能更可取,其中开始PCollection
是 glob 模式,结果PCollection
是与 glob 匹配的文件内容的可迭代。
我的用例是在我的管道中的某个点上,我有 input PCollection<String>
。的每个元素PCollection
都是一个 GCS 全局模式。将匹配 glob 的文件组合在一起很重要,因为文件的内容(一旦读取组中的所有文件)需要在管道的下游进行分组。我最初尝试使用FileIO.matchAll
和随后的GroupByKey
. 但是,matchAll、window 和GroupByKey
组合无法保证在执行 GroupByKey 转换之前将读取与 glob 匹配的所有文件并在同一个窗口中(尽管我可能误解了 Windowing)。如果时间跨度大,就有可能达到预期的效果WindowFn
已应用,但它仍然是概率性的,而不是保证在分组之前将读取所有文件。保持尽可能低的延迟也是我的管道的主要目标。
所以我的下一个并且目前正在运行的计划是使用一个AsyncHttpClient
通过 GCS HTTP API 扇出获取文件内容的计划。我觉得这违背了 Beam 的原则,并且在并行化方面可能不是最佳的。
所以我开始调查 SplittableDoFn 。我目前的计划是允许拆分,以便可以单独处理输入 Iterable 中的每个实体(即来自 glob 模式的每个匹配文件)。我已经能够修改FileIO#MatchFn
(在 Java SDK 中定义)以提供PCollection<String> -> PCollection<Iterable<String>>
在 GCS glob 模式的输入和 glob 匹配的 Iterable 输出之间进行转换的机制。
我遇到的挑战是:如何将拆分调用分组/收集回我的单个输出值DoFn
?我尝试过使用有状态处理并使用 aBagState
来收集文件内容,但我意识到拆分表的ProcessElement
方法DoFn
可能只接受ProcessContext
和Restriction
元组,并且没有其他参数,因此没有StateId
引用 a 的参数StateSpec
(抛出无效运行时的参数错误)。
我在FilePatternWatcher
官方SDF 提案文档中的示例中注意到,创建了一个自定义跟踪器,其中FilePath
对象保存在一个集合中,并可能通过tryClaim
. 这似乎适用于我的用例,但我看不到/不明白如何@SplitRestriction
使用 custom 来实现方法RestrictionTracker
。
如果有人能够提供建议,我将不胜感激。我对任何特定的解决方案没有偏好,只是我想实现在单个PCollection
元素中维护一组实体的能力,但同时从谷歌云存储 (GCS) 中获取这些实体。
解决方案
加入输出 PCollections 对您有帮助吗?
PCollectionList
.of(collectionOne)
.and(collectionTwo)
.and(collectionThree)
...
.apply(Flatten.pCollections())
推荐阅读
- java - 从 persistence.xml 注入 PersistenceContext
- android - 将 Android 应用程序连接到 AWS 上托管的 MySQL 数据库
- javascript - Sublime Text 3 中 x-template 脚本标签内 HTML 的语法高亮显示
- validation - 如何在 Firebase 中的列表项字段中要求唯一值,而该值不是键?
- spring - 如何验证@QueryParam?
- java - Spring Boot Kafka 应用程序未收到消息
- google-iap - Google Idenity Aware Proxy 在 SPA 上使用 API 进行身份验证
- javascript - 如何将“系列”的值存储为会话变量?
- python - 在 TFTP 实现中解压读取请求的结构
- sql - 在 Postgres 中将数组转换为行