google-cloud-storage - 在 Apache Beam / Dataflow Python 流中写入文本文件
问题描述
我有一个非常基本的 Python Dataflow 作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google Cloud Storage。
transformed = ...
transformed | beam.io.WriteToText(known_args.output)
输出被写入--output中特定的位置,但只是临时阶段,即
gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...
该文件永远不会使用分片模板放置到正确命名的位置。
在本地和 DataFlow 运行器上测试。
进一步测试时,我注意到 streaming_wordcount 示例也有同样的问题,但是标准 wordcount 示例写得很好。也许问题在于窗口化或从 pubsub 读取?
看来 WriteToText 与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。
解决方案
Python SDK 中的WriteToText
转换不支持流式传输。
相反,您可以考虑apache_beam.io.fileio
. 在这种情况下,您可以编写如下内容(假设 10 分钟窗口):
my_pcollection = (p | ReadFromPubSub(....)
| WindowInto(FixedWindows(10*60))
| fileio.WriteToFiles(path=known_args.output))
这足以为每个窗口写出单独的文件,并随着流的前进继续这样做。
你会看到这样的文件(假设输出是gs://mybucket/
)。当窗口被触发时,文件将被打印:
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...
默认情况下,文件具有$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
名称 -output
默认情况下,前缀是其中的前缀,但您可以传递更复杂的文件命名函数。
如果您想自定义文件的写入方式(例如文件的命名,或数据的格式,或类似的东西),您可以查看WriteToFiles
.
您可以在此处查看 Beam 测试中使用的转换示例,其中包含更复杂的参数 - 但听起来默认行为对您来说应该足够了。
推荐阅读
- kubernetes - Calico:networkPlugin cni 设置 pod 失败,i/o 超时
- amazon-s3 - 哪种架构最适合创建无服务器 aws 服务?
- python - 添加一个计数器作为数据框的索引
- python - matplotlib plt 未在散点图上正确设置 xlabel/ylabel
- sql - 计算两个时间戳之间的差异
- cmake - 如何让 CMake 与外部库链接?
- websphere - Websphere 自由:计时器管理器
- kotlin - 如何在 Kotlin 中加载
- r - 使用 purrr 进行真实数据分析时的一个问题
- assimp - numWeights 对应于 mnumVertices?