首页 > 解决方案 > 为什么 Google Cloud Dataflow 无法扩展到启用自动缩放且没有配额限制的目标工作人员

问题描述

我正在运行一个 Dataflow 管道来解析存储在云存储桶中的大约 45000 个文本文件。解析后的文本将转换为 JSON 并写入文本文件,以便随后在 BigQuery 中加载(不是管道的一部分)。管道启动几分钟后,目标工作人员的数量增加到 > 30(运行之间的确切数字略有不同),但实际工作人员的数量仍然停留在 1。

我检查过的事情:

如果我让管道运行,它会在大约 2 小时内成功完成,但我希望如果实际工作人员能够扩展到目标,这可以运行得更快。

这是代码的相关部分:

client = storage.Client()
blobs = client.list_blobs(bucket_name)

rf = [b.name for b in blobs]

with beam.Pipeline(options=pipeline_options) as p:
    json_list = (p | 'Create filelist' >> beam.Create(rf)
                   | 'Get string' >> beam.Map(getstring)
                   | 'Filter empty strings' >> beam.Filter(lambda x: x != "")
                   | 'Get JSON' >> beam.Map(getjson)
                   | 'Write output' >> WriteToText(known_args.output))

关于是什么阻止工人扩大规模的任何建议?

标签: pythongoogle-cloud-platformgoogle-cloud-storagegoogle-cloud-dataflowapache-beam

解决方案


这里的问题是此管道中没有可用的并行性。Create 转换是单分片的,管道中的所有其他内容都与它融合在一起。使用像ReadFromText这样的内置文件读取转换可以解决这个问题,或者您可以在 Create 之后放置一个Reshuffle转换,以便将管道分成两个单独的阶段。


推荐阅读