go - 控制 Apache Beam 数据流管道中的并行性
问题描述
我们正在尝试使用 Apache Beam(使用 Go SDK)和 Dataflow 来并行化我们一项耗时的任务。对于更多上下文,我们有缓存作业,它需要一些查询,跨数据库运行并缓存它们。每个数据库查询可能需要几秒钟到几分钟,我们希望并行运行这些查询以更快地完成任务。
创建了一个如下所示的简单管道:
// Create initial PCollection.
startLoad := beam.Create(s, "InitialLoadToStartPipeline")
// Emits a unit of work along with query and date range.
cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)
// Emits a cache response which includes errCode, errMsg, time etc.
cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)
...
排放的数量单位getCachePayloadsFn
并不多,并且在生产中大多为数百,最多为数千。
现在的问题cacheQueryDoFn
不是并行执行,而是一个接一个地依次执行查询。我们通过输入日志StartBundle
并ProcessElement
在缓存函数中记录 goroutine id、进程 id、开始和结束时间等来确认这一点,以确认执行中没有重叠。
即使只有 10 个查询,我们也希望始终并行运行查询。根据我们的理解和文档,它从整体输入创建捆绑包,这些捆绑包并行运行,并且在捆绑包内按顺序运行。有没有办法控制负载的捆绑数量或增加并行度的任何方法?
我们尝试过的事情:
- 保持
num_workers=2
和autoscaling_algorithm=None
。它启动两个虚拟机,但Setup
只在一个虚拟机上运行初始化 DoFn 的方法,并将其用于整个负载。 - 在这里找到
sdk_worker_parallelism
选项。但不确定如何正确设置它。尝试将其设置为. 没有效果。beam.PipelineOptions.Set("sdk_worker_parallelism", "50")
解决方案
默认情况下,Create 不是并行的,所有的 DoFn 都与 Create 融合到同一阶段,因此它们也没有并行性。有关更多信息,请参阅https://beam.apache.org/documentation/runtime/model/#dependent-parallellism 。
您可以使用Reshuffle转换显式强制融合中断。
推荐阅读
- python - 有没有办法在jupyter notebook(python)中保存ipycytoscape(基于javascript的对象)?
- kong - 关于Kong中路由匹配原理的问题
- javascript - 带广播频道的电子 - 无法在不透明的来源中创建广播频道
- python - 提高性能 - 附加列表而不是数据框
- r - 如何在 R 中使用 for 循环打印我的变量摘要输出
- javascript - 如何使用 jQuery 中的状态显示 dataTables 中的隐藏数据?
- react-native - 我将如何在我的 React Native 应用程序中制作搜索过滤器栏?
- python - 使用 cluster_centers_ 时没有获得两个坐标
- python - 如何在 locust 作为库中每秒启动新用户
- json - 从配置单元表中的 json 字符串中提取值