首页 > 解决方案 > 在 Apache Beam / Dataflow Python 流中写入文本文件

问题描述

我有一个非常基本的 Python Dataflow 作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google Cloud Storage。

transformed = ...
transformed | beam.io.WriteToText(known_args.output)

输出被写入--output中特定的位置,但只是临时阶段,即

gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...

该文件永远不会使用分片模板放置到正确命名的位置。

在本地和 DataFlow 运行器上测试。


进一步测试时,我注意到 streaming_wordcount 示例也有同样的问题,但是标准 wordcount 示例写得很好。也许问题在于窗口化或从 pubsub 读取?


看来 WriteToText 与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。

标签: google-cloud-storagegoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

解决方案


Python SDK 中的WriteToText转换不支持流式传输。

相反,您可以考虑apache_beam.io.fileio. 在这种情况下,您可以编写如下内容(假设 10 分钟窗口):

my_pcollection = (p | ReadFromPubSub(....)
                    |  WindowInto(FixedWindows(10*60))
                    |  fileio.WriteToFiles(path=known_args.output))

这足以为每个窗口写出单独的文件,并随着流的前进继续这样做。

你会看到这样的文件(假设输出是gs://mybucket/)。当窗口被触发时,文件将被打印:

gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...

默认情况下,文件具有$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix名称 -output默认情况下,前缀是其中的前缀,但您可以传递更复杂的文件命名函数。


如果您想自定义文件的写入方式(例如文件的命名,或数据的格式,或类似的东西),您可以查看WriteToFiles.

您可以在此处查看 Beam 测试中使用的转换示例,其中包含更复杂的参数 - 但听起来默认行为对您来说应该足够了。


推荐阅读