首页 > 解决方案 > 当在此之前添加 Reshuffle() 时,BatchElements() 适用于 DirectRunner,但不适用于 DataflowRunner

问题描述

在 Apache Beam 中,BatchElements()使用 DirectRunner 运行时,将元素分组为指定大小为 100 的批次。但 DataflowRunner 上的相同代码将元素分组为 1 个批次。我Reshuffle()之前有BatchElements().

没有Reshuffle()它,它在两个跑步者上都能按预期工作。

                p
                | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=subscription)
                | "Parse as List" >> Map(lambda element: json.loads(element))
                | "Flatten" >> FlatMap(lambda elements: elements)
                | Reshuffle()
                | "Typed Element" >> ParDo(TypedElement())
                | "Batch Typed Element"
                >> BatchElements(
                    min_batch_size=50, max_batch_size=50, target_batch_duration_secs=None
                )

有没有人遇到过类似的问题?任何建议将不胜感激!几天来我一直在努力解决这个问题。

标签: google-cloud-dataflowapache-beam

解决方案


推荐阅读