python - 为什么 Google Cloud Dataflow 无法扩展到启用自动缩放且没有配额限制的目标工作人员
问题描述
我正在运行一个 Dataflow 管道来解析存储在云存储桶中的大约 45000 个文本文件。解析后的文本将转换为 JSON 并写入文本文件,以便随后在 BigQuery 中加载(不是管道的一部分)。管道启动几分钟后,目标工作人员的数量增加到 > 30(运行之间的确切数字略有不同),但实际工作人员的数量仍然停留在 1。
我检查过的事情:
- 没有配额限制(通过控制台和作业日志检查)
- 已启用自动缩放
- 单个工作人员的负载约为 80%(Cloud Dataflow 文档提到,如果单个工作人员的负载低于 5%,则禁用自动缩放)
如果我让管道运行,它会在大约 2 小时内成功完成,但我希望如果实际工作人员能够扩展到目标,这可以运行得更快。
这是代码的相关部分:
client = storage.Client()
blobs = client.list_blobs(bucket_name)
rf = [b.name for b in blobs]
with beam.Pipeline(options=pipeline_options) as p:
json_list = (p | 'Create filelist' >> beam.Create(rf)
| 'Get string' >> beam.Map(getstring)
| 'Filter empty strings' >> beam.Filter(lambda x: x != "")
| 'Get JSON' >> beam.Map(getjson)
| 'Write output' >> WriteToText(known_args.output))
关于是什么阻止工人扩大规模的任何建议?
解决方案
这里的问题是此管道中没有可用的并行性。Create 转换是单分片的,管道中的所有其他内容都与它融合在一起。使用像ReadFromText这样的内置文件读取转换可以解决这个问题,或者您可以在 Create 之后放置一个Reshuffle转换,以便将管道分成两个单独的阶段。
推荐阅读
- html - 无论屏幕大小如何,在没有滚动条的情况下在一页中显示 HTML 页面
- node.js - 如何从nodejs中的azure函数更新json文件?
- python - 根据其他列的值有条件地更改系列的值
- r - 使用 return() 提前返回
- r - 如果满足条件,如何在for循环中跳过迭代
- job-scheduling - 为弱扩展测试优先考虑小型 HPC 作业的最佳作业调度程序策略是什么?
- c++ - 在另一个构造函数中分配对象数组,其中类包含 const emember
- c# - 使用 Carter 框架添加模拟提取器而不是真实类
- python - 正则表达式 xml 字符串和打印组
- html - 客户端和服务器站点上的文本区域内容字符数不同