首页 > 解决方案 > 如何使用 TextIO 读取文本文件并返回附加输入字段?

问题描述

我有一个 KV 的 PCollection,其中键是文件名,值是文件的一些附加信息(例如,生成文件的“源”系统)。例如,

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

我需要从文件中读取所有行并使用“源”字段,以 KV PCollection 的形式返回。

KV(line1 from X1.dat, "SourceX")
KV(line2 from X1.dat, "SourceX")
...
KV(line1 from Y1.dat, "SourceY")

我可以通过调用 FileIO.match() 来实现这一点,然后是 DoFn,在其中我顺序读取文件并附加 SourceX(从 SideInput 中传递的映射中检索)。

为了获得并行阅读的好处,我可以使用 TextIO.readAll() 来实现这一点吗?TextIO.read() 返回一个 PCollection,没有文件名信息。我怎样才能将它加入到文件名到源映射的映射中?尝试了 WithKeys 传输,但无法正常工作...

标签: google-cloud-platformgoogle-cloud-dataflowapache-beamdataflow

解决方案


目前FileIO.match()按您的方式使用是完成此操作的最佳方式,但是一旦合并https://github.com/apache/beam/pull/12645,您将能够使用新的ContextualTextIO转换。

请注意,以分布式方式计算行号本质上是昂贵的;您可能想看看是否可以使用偏移量(计算起来更容易,并且与行号相同)。


推荐阅读