首页 > 解决方案 > 我们可以在 ParDo 函数中编写 ParDo 函数吗?

问题描述

例如,我有一个 URL 列表作为字符串存储在 Datastore 中。因此,我使用了 DatastoreIO 函数并将它们读入 PCollection。在 ParDo 的 DoFn 中,对于每个 URL(这是文件的 GCP 云存储位置),我必须读取该位置中存在的文件并进行进一步的转换。

所以我想知道我是否可以在 ParDo 函数中为 PCollections 编写 ParDo。每个文件转换的并行执行类型并发送 KV (key, PCollection) 作为第一个 ParDo 函数的输出。

抱歉,如果我没有清楚地展示我的场景。我是 Apache Beam 和 Google Dataflow 的新手

标签: google-cloud-platformgoogle-cloud-dataflowapache-beam

解决方案


你想要的是TextIO#readAll()

PCollection<String> urls = pipeline.apply(DatastoreIO.read(...))
PCollection<String> lines = urls.apply(TextIO.readAll())

推荐阅读