首页 > 解决方案 > 控制 Apache Beam 数据流管道中的并行性

问题描述

我们正在尝试使用 Apache Beam(使用 Go SDK)和 Dataflow 来并行化我们一项耗时的任务。对于更多上下文,我们有缓存作业,它需要一些查询,跨数据库运行并缓存它们。每个数据库查询可能需要几秒钟到几分钟,我们希望并行运行这些查询以更快地完成任务。

创建了一个如下所示的简单管道:

    // Create initial PCollection.
    startLoad := beam.Create(s, "InitialLoadToStartPipeline")

    // Emits a unit of work along with query and date range.
    cachePayloads := beam.ParDo(s, &getCachePayloadsFn{Config: config}, startLoad)

    // Emits a cache response which includes errCode, errMsg, time etc.
    cacheResponses := beam.ParDo(s, &cacheQueryDoFn{Config: config}, cachePayloads)

    ...

排放的数量单位getCachePayloadsFn并不多,并且在生产中大多为数百,最多为数千。

现在的问题cacheQueryDoFn不是并行执行,而是一个接一个地依次执行查询。我们通过输入日志StartBundleProcessElement在缓存函数中记录 goroutine id、进程 id、开始和结束时间等来确认这一点,以确认执行中没有重叠。

即使只有 10 个查询,我们也希望始终并行运行查询。根据我们的理解和文档,它从整体输入创建捆绑包,这些捆绑包并行运行,并且在捆绑包内按顺序运行。有没有办法控制负载的捆绑数量或增加并行度的任何方法?

我们尝试过的事情:

  1. 保持num_workers=2autoscaling_algorithm=None。它启动两个虚拟机,但Setup只在一个虚拟机上运行初始化 DoFn 的方法,并将其用于整个负载。
  2. 在这里找到sdk_worker_parallelism选项。但不确定如何正确设置它。尝试将其设置为. 没有效果。beam.PipelineOptions.Set("sdk_worker_parallelism", "50")

标签: gogoogle-cloud-dataflowapache-beam

解决方案


默认情况下,Create 不是并行的,所有的 DoFn 都与 Create 融合到同一阶段,因此它们也没有并行性。有关更多信息,请参阅https://beam.apache.org/documentation/runtime/model/#dependent-parallellism 。

您可以使用Reshuffle转换显式强制融合中断。


推荐阅读